http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index f846689..96de8e4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -22,17 +22,16 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.Locale;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
@@ -47,8 +46,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.PutTracker;
 import org.apache.hadoop.util.Progressable;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
@@ -65,7 +65,8 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class S3ABlockOutputStream extends OutputStream {
+class S3ABlockOutputStream extends OutputStream implements
+    StreamCapabilities {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(S3ABlockOutputStream.class);
@@ -87,14 +88,6 @@ class S3ABlockOutputStream extends OutputStream {
   private final ListeningExecutorService executorService;
 
   /**
-   * Retry policy for multipart commits; not all AWS SDK versions retry that.
-   */
-  private final RetryPolicy retryPolicy =
-      RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
-          5,
-          2000,
-          TimeUnit.MILLISECONDS);
-  /**
    * Factory for blocks.
    */
   private final S3ADataBlocks.BlockFactory blockFactory;
@@ -120,7 +113,12 @@ class S3ABlockOutputStream extends OutputStream {
   /**
    * Write operation helper; encapsulation of the filesystem operations.
    */
-  private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+  private final WriteOperationHelper writeOperationHelper;
+
+  /**
+   * Track multipart put operation.
+   */
+  private final PutTracker putTracker;
 
   /**
    * An S3A output stream which uploads partitions in a separate pool of
@@ -138,6 +136,7 @@ class S3ABlockOutputStream extends OutputStream {
    * @param blockFactory factory for creating stream destinations
    * @param statistics stats for this stream
    * @param writeOperationHelper state of the write operation.
+   * @param putTracker put tracking for commit support
    * @throws IOException on any problem
    */
   S3ABlockOutputStream(S3AFileSystem fs,
@@ -147,7 +146,8 @@ class S3ABlockOutputStream extends OutputStream {
       long blockSize,
       S3ADataBlocks.BlockFactory blockFactory,
       S3AInstrumentation.OutputStreamStatistics statistics,
-      S3AFileSystem.WriteOperationHelper writeOperationHelper)
+      WriteOperationHelper writeOperationHelper,
+      PutTracker putTracker)
       throws IOException {
     this.fs = fs;
     this.key = key;
@@ -155,6 +155,7 @@ class S3ABlockOutputStream extends OutputStream {
     this.blockSize = (int) blockSize;
     this.statistics = statistics;
     this.writeOperationHelper = writeOperationHelper;
+    this.putTracker = putTracker;
     Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
         "Block size is too small: %d", blockSize);
     this.executorService = MoreExecutors.listeningDecorator(executorService);
@@ -166,7 +167,11 @@ class S3ABlockOutputStream extends OutputStream {
     // writes a 0-byte entry.
     createBlockIfNeeded();
     LOG.debug("Initialized S3ABlockOutputStream for {}" +
-        " output to {}", writeOperationHelper, activeBlock);
+        " output to {}", key, activeBlock);
+    if (putTracker.initialize()) {
+      LOG.debug("Put tracker requests multipart upload");
+      initMultipartUpload();
+    }
   }
 
   /**
@@ -299,10 +304,7 @@ class S3ABlockOutputStream extends OutputStream {
   private synchronized void uploadCurrentBlock() throws IOException {
     Preconditions.checkState(hasActiveBlock(), "No active block");
     LOG.debug("Writing block # {}", blockCount);
-    if (multiPartUpload == null) {
-      LOG.debug("Initiating Multipart upload");
-      multiPartUpload = new MultiPartUpload();
-    }
+    initMultipartUpload();
     try {
       multiPartUpload.uploadBlockAsync(getActiveBlock());
       bytesSubmitted += getActiveBlock().dataSize();
@@ -313,6 +315,20 @@ class S3ABlockOutputStream extends OutputStream {
   }
 
   /**
+   * Init multipart upload. Assumption: this is called from
+   * a synchronized block.
+   * Note that this makes a blocking HTTPS request to the far end, so
+   * can take time and potentially fail.
+   * @throws IOException failure to initialize the upload
+   */
+  private void initMultipartUpload() throws IOException {
+    if (multiPartUpload == null) {
+      LOG.debug("Initiating Multipart upload");
+      multiPartUpload = new MultiPartUpload(key);
+    }
+  }
+
+  /**
    * Close the stream.
    *
    * This will not return until the upload is complete
@@ -342,22 +358,35 @@ class S3ABlockOutputStream extends OutputStream {
           // This must happen even if there is no data, so that 0 byte files
           // are created.
           bytes = putObject();
+          bytesSubmitted = bytes;
         }
       } else {
-        // there has already been at least one block scheduled for upload;
-        // put up the current then wait
-        if (hasBlock && block.hasData()) {
+        // there's an MPU in progress';
+        // IF there is more data to upload, or no data has yet been uploaded,
+        // PUT the final block
+        if (hasBlock &&
+            (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) {
           //send last part
           uploadCurrentBlock();
         }
         // wait for the partial uploads to finish
         final List<PartETag> partETags =
             multiPartUpload.waitForAllPartUploads();
-        // then complete the operation
-        multiPartUpload.complete(partETags);
         bytes = bytesSubmitted;
+        // then complete the operation
+        if (putTracker.aboutToComplete(multiPartUpload.getUploadId(),
+            partETags,
+            bytes)) {
+          multiPartUpload.complete(partETags);
+        } else {
+          LOG.info("File {} will be visible when the job is committed", key);
+        }
+      }
+      if (!putTracker.outputImmediatelyVisible()) {
+        // track the number of bytes uploaded as commit operations.
+        statistics.commitUploaded(bytes);
       }
-      LOG.debug("Upload complete for {}", writeOperationHelper);
+      LOG.debug("Upload complete to {} by {}", key, writeOperationHelper);
     } catch (IOException ioe) {
       writeOperationHelper.writeFailed(ioe);
       throw ioe;
@@ -367,7 +396,7 @@ class S3ABlockOutputStream extends OutputStream {
       closeAll(LOG, statistics);
       clearActiveBlock();
     }
-    // All end of write operations, including deleting fake parent directories
+    // Note end of write. This does not change the state of the remote FS.
     writeOperationHelper.writeSuccessful(bytes);
   }
 
@@ -387,8 +416,9 @@ class S3ABlockOutputStream extends OutputStream {
     int size = block.dataSize();
     final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
     final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
-        writeOperationHelper.newPutRequest(uploadData.getFile()) :
-        writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
+        writeOperationHelper.createPutObjectRequest(key, uploadData.getFile())
+        : writeOperationHelper.createPutObjectRequest(key,
+            uploadData.getUploadStream(), size);
     long transferQueueTime = now();
     BlockUploadProgress callback =
         new BlockUploadProgress(
@@ -396,18 +426,13 @@ class S3ABlockOutputStream extends OutputStream {
     putObjectRequest.setGeneralProgressListener(callback);
     statistics.blockUploadQueued(size);
     ListenableFuture<PutObjectResult> putObjectResult =
-        executorService.submit(new Callable<PutObjectResult>() {
-          @Override
-          public PutObjectResult call() throws Exception {
-            PutObjectResult result;
-            try {
-              // the putObject call automatically closes the input
-              // stream afterwards.
-              result = writeOperationHelper.putObject(putObjectRequest);
-            } finally {
-              closeAll(LOG, uploadData, block);
-            }
-            return result;
+        executorService.submit(() -> {
+          try {
+            // the putObject call automatically closes the input
+            // stream afterwards.
+            return writeOperationHelper.putObject(putObjectRequest);
+          } finally {
+            closeAll(LOG, uploadData, block);
           }
         });
     clearActiveBlock();
@@ -460,20 +485,82 @@ class S3ABlockOutputStream extends OutputStream {
   }
 
   /**
+   * Return the stream capabilities.
+   * This stream always returns false when queried about hflush and hsync.
+   * If asked about {@link CommitConstants#STREAM_CAPABILITY_MAGIC_OUTPUT}
+   * it will return true iff this is an active "magic" output stream.
+   * @param capability string to query the stream support for.
+   * @return true if the capability is supported by this instance.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (capability.toLowerCase(Locale.ENGLISH)) {
+
+      // does the output stream have delayed visibility
+    case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
+      return !putTracker.outputImmediatelyVisible();
+
+      // The flush/sync options are absolutely not supported
+    case "hflush":
+    case "hsync":
+      return false;
+
+    default:
+      return false;
+    }
+  }
+
+  /**
    * Multiple partition upload.
    */
   private class MultiPartUpload {
     private final String uploadId;
     private final List<ListenableFuture<PartETag>> partETagsFutures;
+    private int partsSubmitted;
+    private int partsUploaded;
+    private long bytesSubmitted;
 
-    MultiPartUpload() throws IOException {
-      this.uploadId = writeOperationHelper.initiateMultiPartUpload();
+    MultiPartUpload(String key) throws IOException {
+      this.uploadId = writeOperationHelper.initiateMultiPartUpload(key);
       this.partETagsFutures = new ArrayList<>(2);
       LOG.debug("Initiated multi-part upload for {} with " +
           "id '{}'", writeOperationHelper, uploadId);
     }
 
     /**
+     * Get a count of parts submitted.
+     * @return the number of parts submitted; will always be >= the
+     * value of {@link #getPartsUploaded()}
+     */
+    public int getPartsSubmitted() {
+      return partsSubmitted;
+    }
+
+    /**
+     * Count of parts actually uploaded.
+     * @return the count of successfully completed part uploads.
+     */
+    public int getPartsUploaded() {
+      return partsUploaded;
+    }
+
+    /**
+     * Get the upload ID; will be null after construction completes.
+     * @return the upload ID
+     */
+    public String getUploadId() {
+      return uploadId;
+    }
+
+    /**
+     * Get the count of bytes submitted.
+     * @return the current upload size.
+     */
+    public long getBytesSubmitted() {
+      return bytesSubmitted;
+    }
+
+    /**
      * Upload a block of data.
      * This will take the block
      * @param block block to upload
@@ -481,17 +568,22 @@ class S3ABlockOutputStream extends OutputStream {
      */
     private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
         throws IOException {
-      LOG.debug("Queueing upload of {}", block);
+      LOG.debug("Queueing upload of {} for upload {}", block, uploadId);
+      Preconditions.checkNotNull(uploadId, "Null uploadId");
+      partsSubmitted++;
       final int size = block.dataSize();
+      bytesSubmitted += size;
       final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
       final int currentPartNumber = partETagsFutures.size() + 1;
       final UploadPartRequest request =
           writeOperationHelper.newUploadPartRequest(
+              key,
               uploadId,
               currentPartNumber,
               size,
               uploadData.getUploadStream(),
-              uploadData.getFile());
+              uploadData.getFile(),
+              0L);
 
       long transferQueueTime = now();
       BlockUploadProgress callback =
@@ -500,24 +592,22 @@ class S3ABlockOutputStream extends OutputStream {
       request.setGeneralProgressListener(callback);
       statistics.blockUploadQueued(block.dataSize());
       ListenableFuture<PartETag> partETagFuture =
-          executorService.submit(new Callable<PartETag>() {
-            @Override
-            public PartETag call() throws Exception {
-              // this is the queued upload operation
-              LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
-                  uploadId);
-              // do the upload
-              PartETag partETag;
-              try {
-                partETag = fs.uploadPart(request).getPartETag();
-                LOG.debug("Completed upload of {} to part {}", block,
-                    partETag.getETag());
-                LOG.debug("Stream statistics of {}", statistics);
-              } finally {
-                // close the stream and block
-                closeAll(LOG, uploadData, block);
-              }
+          executorService.submit(() -> {
+            // this is the queued upload operation
+            // do the upload
+            try {
+              LOG.debug("Uploading part {} for id '{}'",
+                  currentPartNumber, uploadId);
+              PartETag partETag = writeOperationHelper.uploadPart(request)
+                  .getPartETag();
+              LOG.debug("Completed upload of {} to part {}",
+                  block, partETag.getETag());
+              LOG.debug("Stream statistics of {}", statistics);
+              partsUploaded++;
               return partETag;
+            } finally {
+              // close the stream and block
+              closeAll(LOG, uploadData, block);
             }
           });
       partETagsFutures.add(partETagFuture);
@@ -558,28 +648,18 @@ class S3ABlockOutputStream extends OutputStream {
      * @param partETags list of partial uploads
      * @throws IOException on any problem
      */
-    private CompleteMultipartUploadResult complete(List<PartETag> partETags)
+    private void complete(List<PartETag> partETags)
         throws IOException {
-      int retryCount = 0;
-      AmazonClientException lastException;
-      String operation =
-          String.format("Completing multi-part upload for key '%s'," +
-                  " id '%s' with %s partitions ",
-              key, uploadId, partETags.size());
-      do {
-        try {
-          LOG.debug(operation);
-          return writeOperationHelper.completeMultipartUpload(
-                  uploadId,
-                  partETags);
-        } catch (AmazonClientException e) {
-          lastException = e;
-          statistics.exceptionInMultipartComplete();
-        }
-      } while (shouldRetry(operation, lastException, retryCount++));
-      // this point is only reached if the operation failed more than
-      // the allowed retry count
-      throw translateException(operation, key, lastException);
+      AtomicInteger errorCount = new AtomicInteger(0);
+      try {
+        writeOperationHelper.completeMPUwithRetries(key,
+            uploadId,
+            partETags,
+            bytesSubmitted,
+            errorCount);
+      } finally {
+        statistics.exceptionInMultipartComplete(errorCount.get());
+      }
     }
 
     /**
@@ -590,57 +670,14 @@ class S3ABlockOutputStream extends OutputStream {
       int retryCount = 0;
       AmazonClientException lastException;
       fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
-      String operation =
-          String.format("Aborting multi-part upload for '%s', id '%s",
-              writeOperationHelper, uploadId);
-      do {
-        try {
-          LOG.debug(operation);
-          writeOperationHelper.abortMultipartUpload(uploadId);
-          return;
-        } catch (AmazonClientException e) {
-          lastException = e;
-          statistics.exceptionInMultipartAbort();
-        }
-      } while (shouldRetry(operation, lastException, retryCount++));
-      // this point is only reached if the operation failed more than
-      // the allowed retry count
-      LOG.warn("Unable to abort multipart upload, you may need to purge  " +
-          "uploaded parts", lastException);
-    }
-
-    /**
-     * Predicate to determine whether a failed operation should
-     * be attempted again.
-     * If a retry is advised, the exception is automatically logged and
-     * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented.
-     * The method then sleeps for the sleep time suggested by the sleep policy;
-     * if the sleep is interrupted then {@code Thread.interrupted()} is set
-     * to indicate the thread was interrupted; then false is returned.
-     *
-     * @param operation operation for log message
-     * @param e exception raised.
-     * @param retryCount  number of retries already attempted
-     * @return true if another attempt should be made
-     */
-    private boolean shouldRetry(String operation,
-        AmazonClientException e,
-        int retryCount) {
       try {
-        RetryPolicy.RetryAction retryAction =
-            retryPolicy.shouldRetry(e, retryCount, 0, true);
-        boolean retry = retryAction == RetryPolicy.RetryAction.RETRY;
-        if (retry) {
-          fs.incrementStatistic(IGNORED_ERRORS);
-          LOG.info("Retrying {} after exception ", operation, e);
-          Thread.sleep(retryAction.delayMillis);
-        }
-        return retry;
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        return false;
-      } catch (Exception ignored) {
-        return false;
+        writeOperationHelper.abortMultipartUpload(key, uploadId,
+            (text, e, r, i) -> statistics.exceptionInMultipartAbort());
+      } catch (IOException e) {
+        // this point is only reached if the operation failed more than
+        // the allowed retry count
+        LOG.warn("Unable to abort multipart upload,"
+            + " you may need to purge uploaded parts", e);
       }
     }
 
@@ -718,7 +755,7 @@ class S3ABlockOutputStream extends OutputStream {
   private static class ProgressableListener implements ProgressListener {
     private final Progressable progress;
 
-    public ProgressableListener(Progressable progress) {
+    ProgressableListener(Progressable progress) {
       this.progress = progress;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
index 9bc8dcd..0e3bca5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -401,7 +401,7 @@ final class S3ADataBlocks {
     }
 
     /**
-     * InputStream backed by the internal byte array
+     * InputStream backed by the internal byte array.
      *
      * @return
      */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 2171957..b08a134 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -24,6 +24,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.net.URI;
+import java.nio.file.AccessDeniedException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -31,6 +34,7 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.Objects;
@@ -44,19 +48,18 @@ import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.model.MultipartUpload;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
@@ -68,6 +71,7 @@ import com.amazonaws.services.s3.transfer.Copy;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
 import com.amazonaws.services.s3.transfer.Upload;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.event.ProgressEvent;
 import com.google.common.annotations.VisibleForTesting;
@@ -94,20 +98,24 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.PutTracker;
+import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
+import static org.apache.hadoop.fs.s3a.Invoker.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 
@@ -129,15 +137,31 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class S3AFileSystem extends FileSystem {
+public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   /**
    * Default blocksize as used in blocksize and FS status queries.
    */
   public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
+
+  /**
+   * This declared delete as idempotent.
+   * This is an "interesting" topic in past Hadoop FS work.
+   * Essentially: with a single caller, DELETE is idempotent
+   * but in a shared filesystem, it is is very much not so.
+   * Here, on the basis that isn't a filesystem with consistency guarantees,
+   * retryable results in files being deleted.
+  */
+  public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true;
   private URI uri;
   private Path workingDir;
   private String username;
   private AmazonS3 s3;
+  // initial callback policy is fail-once; it's there just to assist
+  // some mock tests and other codepaths trying to call the low level
+  // APIs on an uninitialized filesystem.
+  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.LOG_EVENT);
+  private final Retried onRetry = this::operationRetried;
   private String bucket;
   private int maxKeys;
   private Listing listing;
@@ -154,7 +178,8 @@ public class S3AFileSystem extends FileSystem {
   private CannedAccessControlList cannedACL;
   private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private S3AInstrumentation instrumentation;
-  private S3AStorageStatistics storageStatistics;
+  private final S3AStorageStatistics storageStatistics =
+      createStorageStatistics();
   private long readAhead;
   private S3AInputPolicy inputPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -167,6 +192,7 @@ public class S3AFileSystem extends FileSystem {
   private S3ADataBlocks.BlockFactory blockFactory;
   private int blockOutputActiveBlocks;
   private boolean useListV1;
+  private MagicCommitIntegration committerIntegration;
 
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
@@ -194,9 +220,10 @@ public class S3AFileSystem extends FileSystem {
    */
   public void initialize(URI name, Configuration originalConf)
       throws IOException {
-    uri = S3xLoginHelper.buildFSURI(name);
+    setUri(name);
     // get the host; this is guaranteed to be non-null, non-empty
     bucket = name.getHost();
+    LOG.debug("Initializing S3AFileSystem for {}", bucket);
     // clone the configuration into one with propagated bucket options
     Configuration conf = propagateBucketOptions(originalConf, bucket);
     patchSecurityCredentialProviders(conf);
@@ -216,6 +243,7 @@ public class S3AFileSystem extends FileSystem {
           S3ClientFactory.class);
       s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
           .createS3Client(name);
+      invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
 
       maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
       listing = new Listing(this);
@@ -230,15 +258,6 @@ public class S3AFileSystem extends FileSystem {
 
       readAhead = longBytesOption(conf, READAHEAD_RANGE,
           DEFAULT_READAHEAD_RANGE, 0);
-      storageStatistics = (S3AStorageStatistics)
-          GlobalStorageStatistics.INSTANCE
-              .put(S3AStorageStatistics.NAME,
-                  new GlobalStorageStatistics.StorageStatisticsProvider() {
-                    @Override
-                    public StorageStatistics provide() {
-                      return new S3AStorageStatistics();
-                    }
-                  });
 
       int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
       if (maxThreads < 2) {
@@ -274,11 +293,17 @@ public class S3AFileSystem extends FileSystem {
 
       verifyBucketExists();
 
-      initMultipartUploads(conf);
-
       serverSideEncryptionAlgorithm = getEncryptionAlgorithm(conf);
       inputPolicy = S3AInputPolicy.getPolicy(
           conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
+      LOG.debug("Input fadvise policy = {}", inputPolicy);
+      boolean magicCommitterEnabled = conf.getBoolean(
+          CommitConstants.MAGIC_COMMITTER_ENABLED,
+          CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
+      LOG.debug("Filesystem support for magic committers {} enabled",
+          magicCommitterEnabled ? "is" : "is not");
+      committerIntegration = new MagicCommitIntegration(
+          this, magicCommitterEnabled);
 
       boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);
 
@@ -295,13 +320,14 @@ public class S3AFileSystem extends FileSystem {
               " queue limit={}",
           blockOutputBuffer, partSize, blockOutputActiveBlocks);
 
-      metadataStore = S3Guard.getMetadataStore(this);
+      setMetadataStore(S3Guard.getMetadataStore(this));
       allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
           DEFAULT_METADATASTORE_AUTHORITATIVE);
       if (hasMetadataStore()) {
         LOG.debug("Using metadata store {}, authoritative={}",
             getMetadataStore(), allowAuthoritative);
       }
+      initMultipartUploads(conf);
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -309,27 +335,29 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Create the storage statistics or bind to an existing one.
+   * @return a storage statistics instance.
+   */
+  protected static S3AStorageStatistics createStorageStatistics() {
+    return (S3AStorageStatistics)
+        GlobalStorageStatistics.INSTANCE
+            .put(S3AStorageStatistics.NAME,
+                () -> new S3AStorageStatistics());
+  }
+
+  /**
    * Verify that the bucket exists. This does not check permissions,
    * not even read access.
+   * Retry policy: retrying, translated.
    * @throws FileNotFoundException the bucket is absent
    * @throws IOException any other problem talking to S3
    */
+  @Retries.RetryTranslated
   protected void verifyBucketExists()
       throws FileNotFoundException, IOException {
-    try {
-      if (!s3.doesBucketExist(bucket)) {
-        throw new FileNotFoundException("Bucket " + bucket + " does not 
exist");
-      }
-    } catch (AmazonS3Exception e) {
-      // this is a sign of a serious startup problem so do dump everything
-      LOG.warn(stringify(e), e);
-      throw translateException("doesBucketExist", bucket, e);
-    } catch (AmazonServiceException e) {
-      // this is a sign of a serious startup problem so do dump everything
-      LOG.warn(stringify(e), e);
-      throw translateException("doesBucketExist", bucket, e);
-    } catch (AmazonClientException e) {
-      throw translateException("doesBucketExist", bucket, e);
+    if (!invoker.retry("doesBucketExist", bucket, true,
+        () -> s3.doesBucketExist(bucket))) {
+      throw new FileNotFoundException("Bucket " + bucket + " does not exist");
     }
   }
 
@@ -362,6 +390,7 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  @Retries.RetryTranslated
   private void initMultipartUploads(Configuration conf) throws IOException {
     boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
         DEFAULT_PURGE_EXISTING_MULTIPART);
@@ -369,24 +398,34 @@ public class S3AFileSystem extends FileSystem {
         PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
 
     if (purgeExistingMultipart) {
-      Date purgeBefore =
-          new Date(new Date().getTime() - purgeExistingMultipartAge * 1000);
-
       try {
-        transfers.abortMultipartUploads(bucket, purgeBefore);
-      } catch (AmazonServiceException e) {
-        if (e.getStatusCode() == 403) {
-          instrumentation.errorIgnored();
-          LOG.debug("Failed to purging multipart uploads against {}," +
-              " FS may be read only", bucket, e);
-        } else {
-          throw translateException("purging multipart uploads", bucket, e);
-        }
+        abortOutstandingMultipartUploads(purgeExistingMultipartAge);
+      } catch (AccessDeniedException e) {
+        instrumentation.errorIgnored();
+        LOG.debug("Failed to purge multipart uploads against {}," +
+            " FS may be read only", bucket);
       }
     }
   }
 
   /**
+   * Abort all outstanding MPUs older than a given age.
+   * @param seconds time in seconds
+   * @throws IOException on any failure, other than 403 "permission denied"
+   */
+  @Retries.RetryTranslated
+  public void abortOutstandingMultipartUploads(long seconds)
+      throws IOException {
+    Preconditions.checkArgument(seconds >= 0);
+    Date purgeBefore =
+        new Date(new Date().getTime() - seconds * 1000);
+    LOG.debug("Purging outstanding multipart uploads older than {}",
+        purgeBefore);
+    invoker.retry("Purging multipart uploads", bucket, true,
+        () -> transfers.abortMultipartUploads(bucket, purgeBefore));
+  }
+
+  /**
    * Return the protocol scheme for the FileSystem.
    *
    * @return "s3a"
@@ -404,6 +443,16 @@ public class S3AFileSystem extends FileSystem {
     return uri;
   }
 
+  /**
+   * Set the URI field through {@link S3xLoginHelper}.
+   * Exported for testing.
+   * @param uri filesystem URI.
+   */
+  @VisibleForTesting
+  protected void setUri(URI uri) {
+    this.uri = S3xLoginHelper.buildFSURI(uri);
+  }
+
   @Override
   public int getDefaultPort() {
     return Constants.S3A_DEFAULT_PORT;
@@ -411,6 +460,7 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Returns the S3 client used by this filesystem.
+   * This is for internal use within the S3A code itself.
    * @return AmazonS3Client
    */
   AmazonS3 getAmazonS3Client() {
@@ -418,34 +468,55 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Returns the S3 client used by this filesystem.
+   * <i>Warning: this must only be used for testing, as it bypasses core
+   * S3A operations. </i>
+   * @param reason a justification for requesting access.
+   * @return AmazonS3Client
+   */
+  @VisibleForTesting
+  public AmazonS3 getAmazonS3ClientForTesting(String reason) {
+    LOG.warn("Access to S3A client requested, reason {}", reason);
+    return s3;
+  }
+
+  /**
+   * Set the client -used in mocking tests to force in a different client.
+   * @param client client.
+   */
+  protected void setAmazonS3Client(AmazonS3 client) {
+    Preconditions.checkNotNull(client, "client");
+    LOG.debug("Setting S3 client to {}", client);
+    s3 = client;
+  }
+
+  /**
    * Get the region of a bucket.
    * @return the region in which a bucket is located
    * @throws IOException on any failure.
    */
+  @Retries.RetryTranslated
   public String getBucketLocation() throws IOException {
     return getBucketLocation(bucket);
   }
 
   /**
    * Get the region of a bucket.
+   * Retry policy: retrying, translated.
    * @param bucketName the name of the bucket
    * @return the region in which a bucket is located
    * @throws IOException on any failure.
    */
+  @Retries.RetryTranslated
   public String getBucketLocation(String bucketName) throws IOException {
-    try {
-      return s3.getBucketLocation(bucketName);
-    } catch (AmazonClientException e) {
-      throw translateException("getBucketLocation()",
-          bucketName, e);
-    }
+    return invoker.retry("getBucketLocation()", bucketName, true,
+        ()-> s3.getBucketLocation(bucketName));
   }
 
   /**
-   * Returns the read ahead range value used by this filesystem
-   * @return
+   * Returns the read ahead range value used by this filesystem.
+   * @return the readahead range
    */
-
   @VisibleForTesting
   long getReadAheadRange() {
     return readAhead;
@@ -473,7 +544,7 @@ public class S3AFileSystem extends FileSystem {
       Configuration conf) throws IOException {
     if (directoryAllocator == null) {
       String bufferDir = conf.get(BUFFER_DIR) != null
-          ? BUFFER_DIR : "hadoop.tmp.dir";
+          ? BUFFER_DIR : HADOOP_TMP_DIR;
       directoryAllocator = new LocalDirAllocator(bufferDir);
     }
     return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
@@ -488,6 +559,23 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Set the bucket.
+   * @param bucket the bucket
+   */
+  @VisibleForTesting
+  protected void setBucket(String bucket) {
+    this.bucket = bucket;
+  }
+
+  /**
+   * Get the canned ACL of this FS.
+   * @return an ACL, if any
+   */
+  CannedAccessControlList getCannedACL() {
+    return cannedACL;
+  }
+
+  /**
    * Change the input policy for this FS.
    * @param inputPolicy new policy
    */
@@ -538,7 +626,7 @@ public class S3AFileSystem extends FileSystem {
    * @param key input key
    * @return the path from this key
    */
-  private Path keyToPath(String key) {
+  Path keyToPath(String key) {
     return new Path("/" + key);
   }
 
@@ -547,7 +635,7 @@ public class S3AFileSystem extends FileSystem {
    * @param key input key
    * @return the fully qualified path including URI scheme and bucket name.
    */
-  Path keyToQualifiedPath(String key) {
+  public Path keyToQualifiedPath(String key) {
     return qualify(keyToPath(key));
   }
 
@@ -585,7 +673,7 @@ public class S3AFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
 
-    LOG.debug("Opening '{}' for reading.", f);
+    LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
     final FileStatus fileStatus = getFileStatus(f);
     if (fileStatus.isDirectory()) {
       throw new FileNotFoundException("Can't open " + f
@@ -603,12 +691,15 @@ public class S3AFileSystem extends FileSystem {
             statistics,
             instrumentation,
             readAhead,
-            inputPolicy));
+            inputPolicy,
+            invoker));
   }
 
   /**
    * Create an FSDataOutputStream at the indicated Path with write-progress
    * reporting.
+   * Retry policy: retrying, translated on the getFileStatus() probe.
+   * No data is uploaded to S3 in this call, so retry issues related to that.
    * @param f the file name to open
    * @param permission the permission to set.
    * @param overwrite if a file with this name already exists, then if true,
@@ -625,42 +716,60 @@ public class S3AFileSystem extends FileSystem {
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
-    String key = pathToKey(f);
+    final Path path = qualify(f);
+    String key = pathToKey(path);
     FileStatus status = null;
     try {
       // get the status or throw an FNFE
-      status = getFileStatus(f);
+      status = getFileStatus(path);
 
       // if the thread reaches here, there is something at the path
       if (status.isDirectory()) {
         // path references a directory: automatic error
-        throw new FileAlreadyExistsException(f + " is a directory");
+        throw new FileAlreadyExistsException(path + " is a directory");
       }
       if (!overwrite) {
         // path references a file and overwrite is disabled
-        throw new FileAlreadyExistsException(f + " already exists");
+        throw new FileAlreadyExistsException(path + " already exists");
       }
-      LOG.debug("Overwriting file {}", f);
+      LOG.debug("Overwriting file {}", path);
     } catch (FileNotFoundException e) {
       // this means the file is not found
 
     }
     instrumentation.fileCreated();
+    PutTracker putTracker =
+        committerIntegration.createTracker(path, key);
+    String destKey = putTracker.getDestKey();
     return new FSDataOutputStream(
         new S3ABlockOutputStream(this,
-            key,
+            destKey,
             new SemaphoredDelegatingExecutor(boundedThreadPool,
                 blockOutputActiveBlocks, true),
             progress,
             partSize,
             blockFactory,
             instrumentation.newOutputStreamStatistics(statistics),
-            new WriteOperationHelper(key)
-        ),
+            createWriteOperationHelper(),
+            putTracker),
         null);
   }
 
   /**
+   * Create a new {@code WriteOperationHelper} instance.
+   *
+   * This class permits other low-level operations against the store.
+   * It is unstable and
+   * only intended for code with intimate knowledge of the object store.
+   * If using this, be prepared for changes even on minor point releases.
+   * @return a new helper.
+   */
+  @InterfaceAudience.Private
+  public WriteOperationHelper createWriteOperationHelper() {
+    return new WriteOperationHelper(this);
+  }
+
+  /**
    * {@inheritDoc}
    * @throws FileNotFoundException if the parent directory is not present -or
    * is not a directory.
@@ -883,7 +992,7 @@ public class S3AFileSystem extends FileSystem {
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
       }
 
-      Path parentPath = keyToPath(srcKey);
+      Path parentPath = keyToQualifiedPath(srcKey);
       RemoteIterator<LocatedFileStatus> iterator = 
listFilesAndEmptyDirectories(
           parentPath, true);
       while (iterator.hasNext()) {
@@ -940,7 +1049,7 @@ public class S3AFileSystem extends FileSystem {
 
     if (src.getParent() != dst.getParent()) {
       deleteUnnecessaryFakeDirectories(dst.getParent());
-      createFakeDirectoryIfNecessary(src.getParent());
+      maybeCreateFakeParentDirectory(src);
     }
     return true;
   }
@@ -978,6 +1087,7 @@ public class S3AFileSystem extends FileSystem {
   /** For testing only.  See ITestS3GuardEmptyDirs. */
   @VisibleForTesting
   void setMetadataStore(MetadataStore ms) {
+    Preconditions.checkNotNull(ms);
     metadataStore = ms;
   }
 
@@ -1018,6 +1128,48 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Callback when an operation was retried.
+   * Increments the statistics of ignored errors or throttled requests,
+   * depending up on the exception class.
+   * @param ex exception.
+   */
+  public void operationRetried(Exception ex) {
+    Statistic stat = isThrottleException(ex)
+        ? STORE_IO_THROTTLED
+        : IGNORED_ERRORS;
+    instrumentation.incrementCounter(stat, 1);
+    storageStatistics.incrementCounter(stat, 1);
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void operationRetried(
+      String text,
+      Exception ex,
+      int retries,
+      boolean idempotent) {
+    operationRetried(ex);
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation against a metastore
+   * is retried.
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  public void metastoreOperationRetried(Exception ex,
+      int retries,
+      boolean idempotent) {
+    operationRetried(ex);
+  }
+
+  /**
    * Get the storage statistics of this filesystem.
    * @return the storage statistics
    */
@@ -1028,11 +1180,13 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Request object metadata; increments counters in the process.
+   * Retry policy: retry untranslated.
    * @param key key
    * @return the metadata
+   * @throws IOException if the retry invocation raises one (it shouldn't).
    */
-  protected ObjectMetadata getObjectMetadata(String key) {
-    incrementStatistic(OBJECT_METADATA_REQUESTS);
+  @Retries.RetryRaw
+  protected ObjectMetadata getObjectMetadata(String key) throws IOException {
     GetObjectMetadataRequest request =
         new GetObjectMetadataRequest(bucket, key);
     //SSE-C requires to be filled in if enabled for object metadata
@@ -1040,7 +1194,11 @@ public class S3AFileSystem extends FileSystem {
         StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
       request.setSSECustomerKey(generateSSECustomerKey());
     }
-    ObjectMetadata meta = s3.getObjectMetadata(request);
+    ObjectMetadata meta = invoker.retryUntranslated("GET " + key, true,
+        () -> {
+          incrementStatistic(OBJECT_METADATA_REQUESTS);
+          return s3.getObjectMetadata(request);
+        });
     incrementReadOperations();
     return meta;
   }
@@ -1048,40 +1206,68 @@ public class S3AFileSystem extends FileSystem {
   /**
    * Initiate a {@code listObjects} operation, incrementing metrics
    * in the process.
+   *
+   * Retry policy: retry untranslated.
    * @param request request to initiate
    * @return the results
+   * @throws IOException if the retry invocation raises one (it shouldn't).
    */
-  protected S3ListResult listObjects(S3ListRequest request) {
-    incrementStatistic(OBJECT_LIST_REQUESTS);
+  @Retries.RetryRaw
+  protected S3ListResult listObjects(S3ListRequest request) throws IOException 
{
     incrementReadOperations();
+    incrementStatistic(OBJECT_LIST_REQUESTS);
+    validateListArguments(request);
+    return invoker.retryUntranslated(
+        request.toString(),
+        true,
+        () -> {
+          if (useListV1) {
+            return S3ListResult.v1(s3.listObjects(request.getV1()));
+          } else {
+            return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+          }
+        });
+  }
+
+  /**
+   * Validate the list arguments with this bucket's settings.
+   * @param request the request to validate
+   */
+  private void validateListArguments(S3ListRequest request) {
     if (useListV1) {
       Preconditions.checkArgument(request.isV1());
-      return S3ListResult.v1(s3.listObjects(request.getV1()));
     } else {
       Preconditions.checkArgument(!request.isV1());
-      return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
     }
   }
 
   /**
    * List the next set of objects.
+   * Retry policy: retry untranslated.
    * @param request last list objects request to continue
    * @param prevResult last paged result to continue from
    * @return the next result object
+   * @throws IOException: none, just there for retryUntranslated.
    */
+  @Retries.RetryRaw
   protected S3ListResult continueListObjects(S3ListRequest request,
-      S3ListResult prevResult) {
-    incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
+      S3ListResult prevResult) throws IOException {
     incrementReadOperations();
-    if (useListV1) {
-      Preconditions.checkArgument(request.isV1());
-      return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1()));
-    } else {
-      Preconditions.checkArgument(!request.isV1());
-      request.getV2().setContinuationToken(prevResult.getV2()
-          .getNextContinuationToken());
-      return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
-    }
+    validateListArguments(request);
+    return invoker.retryUntranslated(
+        request.toString(),
+        true,
+        () -> {
+          incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
+          if (useListV1) {
+            return S3ListResult.v1(
+                s3.listNextBatchOfObjects(prevResult.getV1()));
+          } else {
+            request.getV2().setContinuationToken(prevResult.getV2()
+                .getNextContinuationToken());
+            return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+          }
+        });
   }
 
   /**
@@ -1101,16 +1287,52 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-   * Delete an object.
+   * Delete an object. This is the low-level internal call which
+   * <i>does not</i> update the metastore.
    * Increments the {@code OBJECT_DELETE_REQUESTS} and write
    * operation statistics.
+   *
+   * Retry policy: retry untranslated; delete considered idempotent.
    * @param key key to blob to delete.
+   * @throws AmazonClientException problems working with S3
+   * @throws InvalidRequestException if the request was rejected due to
+   * a mistaken attempt to delete the root directory.
    */
-  private void deleteObject(String key) throws InvalidRequestException {
+  @VisibleForTesting
+  @Retries.RetryRaw
+  protected void deleteObject(String key)
+      throws AmazonClientException, IOException {
     blockRootDelete(key);
     incrementWriteOperations();
-    incrementStatistic(OBJECT_DELETE_REQUESTS);
-    s3.deleteObject(bucket, key);
+    invoker.retryUntranslated("Delete "+ bucket + ":/" + key,
+        DELETE_CONSIDERED_IDEMPOTENT,
+        ()-> {
+          incrementStatistic(OBJECT_DELETE_REQUESTS);
+          s3.deleteObject(bucket, key);
+          return null;
+        });
+  }
+
+  /**
+   * Delete an object, also updating the metastore.
+   * This call does <i>not</i> create any mock parent entries.
+   * Retry policy: retry untranslated; delete considered idempotent.
+   * @param f path path to delete
+   * @param key key of entry
+   * @param isFile is the path a file (used for instrumentation only)
+   * @throws AmazonClientException problems working with S3
+   * @throws IOException IO failure
+   */
+  @Retries.RetryRaw
+  void deleteObjectAtPath(Path f, String key, boolean isFile)
+      throws AmazonClientException, IOException {
+    if (isFile) {
+      instrumentation.fileDeleted(1);
+    } else {
+      instrumentation.directoryDeleted();
+    }
+    deleteObject(key);
+    metadataStore.delete(f);
   }
 
   /**
@@ -1130,17 +1352,23 @@ public class S3AFileSystem extends FileSystem {
    * Perform a bulk object delete operation.
    * Increments the {@code OBJECT_DELETE_REQUESTS} and write
    * operation statistics.
+   * Retry policy: retry untranslated; delete considered idempotent.
    * @param deleteRequest keys to delete on the s3-backend
    * @throws MultiObjectDeleteException one or more of the keys could not
    * be deleted.
    * @throws AmazonClientException amazon-layer failure.
    */
+  @Retries.RetryRaw
   private void deleteObjects(DeleteObjectsRequest deleteRequest)
-      throws MultiObjectDeleteException, AmazonClientException {
+      throws MultiObjectDeleteException, AmazonClientException, IOException {
     incrementWriteOperations();
-    incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
     try {
-      s3.deleteObjects(deleteRequest);
+      invoker.retryUntranslated("delete",
+          DELETE_CONSIDERED_IDEMPOTENT,
+          () -> {
+            incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
+            return s3.deleteObjects(deleteRequest);
+          });
     } catch (MultiObjectDeleteException e) {
       // one or more of the operations failed.
       List<MultiObjectDeleteException.DeleteError> errors = e.getErrors();
@@ -1185,6 +1413,7 @@ public class S3AFileSystem extends FileSystem {
       ObjectMetadata metadata,
       InputStream inputStream) {
     Preconditions.checkNotNull(inputStream);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(key), "Null/empty key");
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
     setOptionalPutRequestParameters(putObjectRequest);
@@ -1231,36 +1460,32 @@ public class S3AFileSystem extends FileSystem {
    * Because the operation is async, any stream supplied in the request
    * must reference data (files, buffers) which stay valid until the upload
    * completes.
+   * Retry policy: N/A: the transfer manager is performing the upload.
    * @param putObjectRequest the request
    * @return the upload initiated
    */
+  @Retries.OnceRaw
   public UploadInfo putObject(PutObjectRequest putObjectRequest) {
-    long len;
-    if (putObjectRequest.getFile() != null) {
-      len = putObjectRequest.getFile().length();
-    } else {
-      len = putObjectRequest.getMetadata().getContentLength();
-    }
+    long len = getPutRequestLength(putObjectRequest);
+    LOG.debug("PUT {} bytes to {} via transfer manager ",
+        len, putObjectRequest.getKey());
     incrementPutStartStatistics(len);
-    try {
-      Upload upload = transfers.upload(putObjectRequest);
-      incrementPutCompletedStatistics(true, len);
-      return new UploadInfo(upload, len);
-    } catch (AmazonClientException e) {
-      incrementPutCompletedStatistics(false, len);
-      throw e;
-    }
+    Upload upload = transfers.upload(putObjectRequest);
+    return new UploadInfo(upload, len);
   }
 
   /**
    * PUT an object directly (i.e. not via the transfer manager).
    * Byte length is calculated from the file length, or, if there is no
    * file, from the content length of the header.
+   *
+   * Retry Policy: none.
    * <i>Important: this call will close any input stream in the request.</i>
    * @param putObjectRequest the request
    * @return the upload initiated
    * @throws AmazonClientException on problems
    */
+  @Retries.OnceRaw
   PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
       throws AmazonClientException {
     long len = getPutRequestLength(putObjectRequest);
@@ -1269,6 +1494,8 @@ public class S3AFileSystem extends FileSystem {
     try {
       PutObjectResult result = s3.putObject(putObjectRequest);
       incrementPutCompletedStatistics(true, len);
+      // update metadata
+      finishedWrite(putObjectRequest.getKey(), len);
       return result;
     } catch (AmazonClientException e) {
       incrementPutCompletedStatistics(false, len);
@@ -1297,11 +1524,14 @@ public class S3AFileSystem extends FileSystem {
    * Upload part of a multi-partition file.
    * Increments the write and put counters.
    * <i>Important: this call does not close any input stream in the 
request.</i>
+   *
+   * Retry Policy: none.
    * @param request request
    * @return the result of the operation.
    * @throws AmazonClientException on problems
    */
-  public UploadPartResult uploadPart(UploadPartRequest request)
+  @Retries.OnceRaw
+  UploadPartResult uploadPart(UploadPartRequest request)
       throws AmazonClientException {
     long len = request.getPartSize();
     incrementPutStartStatistics(len);
@@ -1366,7 +1596,7 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * A helper method to delete a list of keys on a s3-backend.
-   *
+   * Retry policy: retry untranslated; delete considered idempotent.
    * @param keysToDelete collection of keys to delete on the s3-backend.
    *        if empty, no request is made of the object store.
    * @param clearKeys clears the keysToDelete-list after processing the list
@@ -1379,10 +1609,11 @@ public class S3AFileSystem extends FileSystem {
    * @throws AmazonClientException amazon-layer failure.
    */
   @VisibleForTesting
+  @Retries.RetryRaw
   void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
       boolean clearKeys, boolean deleteFakeDir)
       throws MultiObjectDeleteException, AmazonClientException,
-      InvalidRequestException {
+      IOException {
     if (keysToDelete.isEmpty()) {
       // exit fast if there are no keys to delete
       return;
@@ -1415,9 +1646,12 @@ public class S3AFileSystem extends FileSystem {
    * @param recursive if path is a directory and set to
    * true, the directory is deleted else throws an exception. In
    * case of a file the recursive can be set to either true or false.
-   * @return  true if delete is successful else false.
+   * @return true if the path existed and then was deleted; false if there
+   * was no path in the first place, or the corner cases of root path deletion
+   * have surfaced.
    * @throws IOException due to inability to delete a directory or file.
    */
+  @Retries.RetryTranslated
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
       return innerDelete(innerGetFileStatus(f, true), recursive);
@@ -1437,14 +1671,15 @@ public class S3AFileSystem extends FileSystem {
    * @param recursive if path is a directory and set to
    * true, the directory is deleted else throws an exception. In
    * case of a file the recursive can be set to either true or false.
-   * @return  true if delete is successful else false.
+   * @return true, except in the corner cases of root directory deletion
    * @throws IOException due to inability to delete a directory or file.
    * @throws AmazonClientException on failures inside the AWS SDK
    */
+  @Retries.RetryMixed
   private boolean innerDelete(S3AFileStatus status, boolean recursive)
       throws IOException, AmazonClientException {
     Path f = status.getPath();
-    LOG.debug("Delete path {} - recursive {}", f , recursive);
+    LOG.debug("Delete path {} - recursive {}", f, recursive);
 
     String key = pathToKey(f);
 
@@ -1468,10 +1703,8 @@ public class S3AFileSystem extends FileSystem {
 
       if (status.isEmptyDirectory() == Tristate.TRUE) {
         LOG.debug("Deleting fake empty directory {}", key);
-        // HADOOP-13761 S3Guard: retries here
-        deleteObject(key);
-        metadataStore.delete(f);
-        instrumentation.directoryDeleted();
+        // HADOOP-13761 s3guard: retries here
+        deleteObjectAtPath(f, key, false);
       } else {
         LOG.debug("Getting objects for directory prefix {} to delete", key);
 
@@ -1486,7 +1719,6 @@ public class S3AFileSystem extends FileSystem {
             LOG.debug("Got object to delete {}", summary.getKey());
 
             if (keys.size() == MAX_ENTRIES_TO_DELETE) {
-              // TODO: HADOOP-13761 S3Guard: retries
               removeKeys(keys, true, false);
             }
           }
@@ -1505,15 +1737,10 @@ public class S3AFileSystem extends FileSystem {
       metadataStore.deleteSubtree(f);
     } else {
       LOG.debug("delete: Path is a file");
-      instrumentation.fileDeleted(1);
-      deleteObject(key);
-      metadataStore.delete(f);
+      deleteObjectAtPath(f, key, true);
     }
 
-    Path parent = f.getParent();
-    if (parent != null) {
-      createFakeDirectoryIfNecessary(parent);
-    }
+    maybeCreateFakeParentDirectory(f);
     return true;
   }
 
@@ -1543,6 +1770,15 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Create a fake directory if required.
+   * That is: it is not the root path and the path does not exist.
+   * Retry policy: retrying; untranslated.
+   * @param f path to create
+   * @throws IOException IO problem
+   * @throws AmazonClientException untranslated AWS client problem
+   */
+  @Retries.RetryTranslated
   private void createFakeDirectoryIfNecessary(Path f)
       throws IOException, AmazonClientException {
     String key = pathToKey(f);
@@ -1553,6 +1789,21 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Create a fake parent directory if required.
+   * That is: it parent is not the root path and does not yet exist.
+   * @param path whose parent is created if needed.
+   * @throws IOException IO problem
+   * @throws AmazonClientException untranslated AWS client problem
+   */
+  void maybeCreateFakeParentDirectory(Path path)
+      throws IOException, AmazonClientException {
+    Path parent = path.getParent();
+    if (parent != null) {
+      createFakeDirectoryIfNecessary(parent);
+    }
+  }
+
+  /**
    * List the statuses of the files/directories in the given path if the path 
is
    * a directory.
    *
@@ -1563,11 +1814,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
-    try {
-      return innerListStatus(f);
-    } catch (AmazonClientException e) {
-      throw translateException("listStatus", f, e);
-    }
+    return once("listStatus", f.toString(), () -> innerListStatus(f));
   }
 
   /**
@@ -1697,7 +1944,7 @@ public class S3AFileSystem extends FileSystem {
    * Existence of the directory hierarchy is not an error.
    * @param path path to create
    * @param permission to apply to f
-   * @return true if a directory was created
+   * @return true if a directory was created or already existed
    * @throws FileAlreadyExistsException there is a file at the path specified
    * @throws IOException other IO problems
    */
@@ -1787,6 +2034,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws FileNotFoundException when the path does not exist
    * @throws IOException on other problems.
    */
+  @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
     return innerGetFileStatus(f, false);
   }
@@ -1801,6 +2049,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException on other problems.
    */
   @VisibleForTesting
+  @Retries.RetryTranslated
   S3AFileStatus innerGetFileStatus(final Path f,
       boolean needEmptyDirectoryFlag) throws IOException {
     incrementStatistic(INVOCATION_GET_FILE_STATUS);
@@ -1856,12 +2105,14 @@ public class S3AFileSystem extends FileSystem {
    * Raw {@code getFileStatus} that talks direct to S3.
    * Used to implement {@link #innerGetFileStatus(Path, boolean)},
    * and for direct management of empty directory blobs.
+   * Retry policy: retry translated.
    * @param path Qualified path
    * @param key  Key string for the path
    * @return Status
    * @throws FileNotFoundException when the path does not exist
    * @throws IOException on other problems.
    */
+  @Retries.RetryTranslated
   private S3AFileStatus s3GetFileStatus(final Path path, String key,
       Set<Path> tombstones) throws IOException {
     if (!key.isEmpty()) {
@@ -2000,8 +2251,11 @@ public class S3AFileSystem extends FileSystem {
   /**
    * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
    * S3Guard MetadataStore, if any, will be skipped.
+   * Retry policy: retrying; translated.
    * @return true if path exists in S3
+   * @throws IOException IO failure
    */
+  @Retries.RetryTranslated
   private boolean s3Exists(final Path f) throws IOException {
     Path path = qualify(f);
     String key = pathToKey(path);
@@ -2033,12 +2287,7 @@ public class S3AFileSystem extends FileSystem {
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
       Path dst) throws IOException {
-    try {
-      innerCopyFromLocalFile(delSrc, overwrite, src, dst);
-    } catch (AmazonClientException e) {
-      throw translateException("copyFromLocalFile(" + src + ", " + dst + ")",
-          src, e);
-    }
+    innerCopyFromLocalFile(delSrc, overwrite, src, dst);
   }
 
   /**
@@ -2060,6 +2309,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws AmazonClientException failure in the AWS SDK
    * @throws IllegalArgumentException if the source path is not on the local FS
    */
+  @Retries.RetryTranslated
   private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
       Path src, Path dst)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
@@ -2089,25 +2339,66 @@ public class S3AFileSystem extends FileSystem {
     }
     final String key = pathToKey(dst);
     final ObjectMetadata om = newObjectMetadata(srcfile.length());
+    Progressable progress = null;
     PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
+    invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true,
+        () -> executePut(putObjectRequest, progress));
+    if (delSrc) {
+      local.delete(src, false);
+    }
+  }
+
+  /**
+   * Execute a PUT via the transfer manager, blocking for completion,
+   * updating the metastore afterwards.
+   * If the waiting for completion is interrupted, the upload will be
+   * aborted before an {@code InterruptedIOException} is thrown.
+   * @param putObjectRequest request
+   * @param progress optional progress callback
+   * @return the upload result
+   * @throws InterruptedIOException if the blocking was interrupted.
+   */
+  @Retries.OnceRaw
+  UploadResult executePut(PutObjectRequest putObjectRequest,
+      Progressable progress)
+      throws InterruptedIOException {
+    String key = putObjectRequest.getKey();
     UploadInfo info = putObject(putObjectRequest);
     Upload upload = info.getUpload();
     ProgressableProgressListener listener = new ProgressableProgressListener(
-        this, key, upload, null);
+        this, key, upload, progress);
     upload.addProgressListener(listener);
-    try {
-      upload.waitForUploadResult();
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException("Interrupted copying " + src
-          + " to "  + dst + ", cancelling");
-    }
+    UploadResult result = waitForUploadCompletion(key, info);
     listener.uploadCompleted();
-
-    // This will delete unnecessary fake parent directories
+    // post-write actions
     finishedWrite(key, info.getLength());
+    return result;
+  }
 
-    if (delSrc) {
-      local.delete(src, false);
+  /**
+   * Wait for an upload to complete.
+   * If the waiting for completion is interrupted, the upload will be
+   * aborted before an {@code InterruptedIOException} is thrown.
+   * @param upload upload to wait for
+   * @param key destination key
+   * @return the upload result
+   * @throws InterruptedIOException if the blocking was interrupted.
+   */
+  UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo)
+      throws InterruptedIOException {
+    Upload upload = uploadInfo.getUpload();
+    try {
+      UploadResult result = upload.waitForUploadResult();
+      incrementPutCompletedStatistics(true, uploadInfo.getLength());
+      return result;
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted: aborting upload");
+      incrementPutCompletedStatistics(false, uploadInfo.getLength());
+      upload.abort();
+      throw (InterruptedIOException)
+          new InterruptedIOException("Interrupted in PUT to "
+              + keyToQualifiedPath(key))
+          .initCause(e);
     }
   }
 
@@ -2211,6 +2502,21 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Initiate a multipart upload from the preconfigured request.
+   * Retry policy: none + untranslated.
+   * @param request request to initiate
+   * @return the result of the call
+   * @throws AmazonClientException on failures inside the AWS SDK
+   * @throws IOException Other IO problems
+   */
+  @Retries.OnceRaw
+  InitiateMultipartUploadResult initiateMultipartUpload(
+      InitiateMultipartUploadRequest request) throws IOException {
+    LOG.debug("Initiate multipart upload to {}", request.getKey());
+    incrementStatistic(OBJECT_MULTIPART_UPLOAD_INITIATED);
+    return getAmazonS3Client().initiateMultipartUpload(request);
+  }
 
   protected void setOptionalCopyObjectRequestParameters(
       CopyObjectRequest copyObjectRequest) throws IOException {
@@ -2286,6 +2592,7 @@ public class S3AFileSystem extends FileSystem {
    * @param length  total length of file written
    */
   @InterfaceAudience.Private
+  @Retries.RetryTranslated("Exceptions are swallowed")
   void finishedWrite(String key, long length) {
     LOG.debug("Finished write to {}, len {}", key, length);
     Path p = keyToQualifiedPath(key);
@@ -2310,9 +2617,10 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Delete mock parent directories which are no longer needed.
-   * This code swallows IO exceptions encountered
+   * Retry policy: retrying; exceptions swallowed.
    * @param path path
    */
+  @Retries.RetryRaw("Exceptions are swallowed")
   private void deleteUnnecessaryFakeDirectories(Path path) {
     List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
     while (!path.isRoot()) {
@@ -2324,7 +2632,7 @@ public class S3AFileSystem extends FileSystem {
     }
     try {
       removeKeys(keysToRemove, false, true);
-    } catch(AmazonClientException | InvalidRequestException e) {
+    } catch(AmazonClientException | IOException e) {
       instrumentation.errorIgnored();
       if (LOG.isDebugEnabled()) {
         StringBuilder sb = new StringBuilder();
@@ -2336,9 +2644,15 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Create a fake directory, always ending in "/".
+   * Retry policy: retrying; translated.
+   * @param objectName name of directory object.
+   * @throws IOException IO failure
+   */
+  @Retries.RetryTranslated
   private void createFakeDirectory(final String objectName)
-      throws AmazonClientException, AmazonServiceException,
-      InterruptedIOException {
+      throws IOException {
     if (!objectName.endsWith("/")) {
       createEmptyObject(objectName + "/");
     } else {
@@ -2346,10 +2660,15 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  // Used to create an empty file that represents an empty directory
+  /**
+   * Used to create an empty file that represents an empty directory.
+   * Retry policy: retrying; translated.
+   * @param objectName object to create
+   * @throws IOException IO failure
+   */
+  @Retries.RetryTranslated
   private void createEmptyObject(final String objectName)
-      throws AmazonClientException, AmazonServiceException,
-      InterruptedIOException {
+      throws IOException {
     final InputStream im = new InputStream() {
       @Override
       public int read() throws IOException {
@@ -2360,12 +2679,9 @@ public class S3AFileSystem extends FileSystem {
     PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
         newObjectMetadata(0L),
         im);
-    UploadInfo info = putObject(putObjectRequest);
-    try {
-      info.getUpload().waitForUploadResult();
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException("Interrupted creating " + objectName);
-    }
+    invoker.retry("PUT 0-byte object ", objectName,
+         true,
+        () -> putObjectDirect(putObjectRequest));
     incrementPutProgressStatistics(objectName, 0);
     instrumentation.directoryCreated();
   }
@@ -2473,6 +2789,7 @@ public class S3AFileSystem extends FileSystem {
     sb.append(", metastore=").append(metadataStore);
     sb.append(", authoritative=").append(allowAuthoritative);
     sb.append(", useListV1=").append(useListV1);
+    sb.append(", magicCommitter=").append(isMagicCommitEnabled());
     sb.append(", boundedExecutor=").append(boundedThreadPool);
     sb.append(", unboundedExecutor=").append(unboundedThreadPool);
     sb.append(", statistics {")
@@ -2512,6 +2829,24 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Is magic commit enabled?
+   * @return true if magic commit support is turned on.
+   */
+  public boolean isMagicCommitEnabled() {
+    return committerIntegration.isMagicCommitEnabled();
+  }
+
+  /**
+   * Predicate: is a path a magic commit path?
+   * True if magic commit is enabled and the path qualifies as special.
+   * @param path path to examine
+   * @return true if the path is or is under a magic directory
+   */
+  public boolean isMagicCommitPath(Path path) {
+    return committerIntegration.isMagicCommitPath(path);
+  }
+
+  /**
    * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
    * {@inheritDoc}
    */
@@ -2686,6 +3021,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException if any I/O error occurred
    */
   @Override
+  @Retries.OnceTranslated
   public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
       final PathFilter filter)
       throws FileNotFoundException, IOException {
@@ -2738,203 +3074,91 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-   * Helper for an ongoing write operation.
-   * <p>
-   * It hides direct access to the S3 API from the output stream,
-   * and is a location where the object upload process can be evolved/enhanced.
-   * <p>
-   * Features
-   * <ul>
-   *   <li>Methods to create and submit requests to S3, so avoiding
-   *   all direct interaction with the AWS APIs.</li>
-   *   <li>Some extra preflight checks of arguments, so failing fast on
-   *   errors.</li>
-   *   <li>Callbacks to let the FS know of events in the output stream
-   *   upload process.</li>
-   * </ul>
-   *
-   * Each instance of this state is unique to a single output stream.
-   */
-  final class WriteOperationHelper {
-    private final String key;
-
-    private WriteOperationHelper(String key) {
-      this.key = key;
-    }
-
-    /**
-     * Create a {@link PutObjectRequest} request.
-     * If {@code length} is set, the metadata is configured with the size of
-     * the upload.
-     * @param inputStream source data.
-     * @param length size, if known. Use -1 for not known
-     * @return the request
-     */
-    PutObjectRequest newPutRequest(InputStream inputStream, long length) {
-      PutObjectRequest request = newPutObjectRequest(key,
-          newObjectMetadata(length), inputStream);
-      return request;
-    }
-
-    /**
-     * Create a {@link PutObjectRequest} request to upload a file.
-     * @param sourceFile source file
-     * @return the request
-     */
-    PutObjectRequest newPutRequest(File sourceFile) {
-      int length = (int) sourceFile.length();
-      PutObjectRequest request = newPutObjectRequest(key,
-          newObjectMetadata(length), sourceFile);
-      return request;
-    }
-
-    /**
-     * Callback on a successful write.
-     */
-    void writeSuccessful(long length) {
-      finishedWrite(key, length);
-    }
-
-    /**
-     * Callback on a write failure.
-     * @param e Any exception raised which triggered the failure.
-     */
-    void writeFailed(Exception e) {
-      LOG.debug("Write to {} failed", this, e);
-    }
-
-    /**
-     * Create a new object metadata instance.
-     * Any standard metadata headers are added here, for example:
-     * encryption.
-     * @param length size, if known. Use -1 for not known
-     * @return a new metadata instance
-     */
-    public ObjectMetadata newObjectMetadata(long length) {
-      return S3AFileSystem.this.newObjectMetadata(length);
-    }
-
-    /**
-     * Start the multipart upload process.
-     * @return the upload result containing the ID
-     * @throws IOException IO problem
-     */
-    String initiateMultiPartUpload() throws IOException {
-      LOG.debug("Initiating Multipart upload");
-      final InitiateMultipartUploadRequest initiateMPURequest =
-          new InitiateMultipartUploadRequest(bucket,
-              key,
-              newObjectMetadata(-1));
-      initiateMPURequest.setCannedACL(cannedACL);
-      setOptionalMultipartUploadRequestParameters(initiateMPURequest);
-      try {
-        return s3.initiateMultipartUpload(initiateMPURequest)
-            .getUploadId();
-      } catch (AmazonClientException ace) {
-        throw translateException("initiate MultiPartUpload", key, ace);
+   * Listing all multipart uploads; limited to the first few hundred.
+   * Retry policy: retry, translated.
+   * @return a listing of multipart uploads.
+   * @param prefix prefix to scan for, "" for none
+   * @throws IOException IO failure, including any uprated 
AmazonClientException
+   */
+  @InterfaceAudience.Private
+  @Retries.RetryTranslated
+  public List<MultipartUpload> listMultipartUploads(String prefix)
+      throws IOException {
+    ListMultipartUploadsRequest request = new ListMultipartUploadsRequest(
+        bucket);
+    if (!prefix.isEmpty()) {
+      if (!prefix.endsWith("/")) {
+        prefix = prefix + "/";
       }
+      request.setPrefix(prefix);
     }
 
-    /**
-     * Complete a multipart upload operation.
-     * @param uploadId multipart operation Id
-     * @param partETags list of partial uploads
-     * @return the result
-     * @throws AmazonClientException on problems.
-     */
-    CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
-        List<PartETag> partETags) throws AmazonClientException {
-      Preconditions.checkNotNull(uploadId);
-      Preconditions.checkNotNull(partETags);
-      Preconditions.checkArgument(!partETags.isEmpty(),
-          "No partitions have been uploaded");
-      LOG.debug("Completing multipart upload {} with {} parts",
-          uploadId, partETags.size());
-      // a copy of the list is required, so that the AWS SDK doesn't
-      // attempt to sort an unmodifiable list.
-      return s3.completeMultipartUpload(
-          new CompleteMultipartUploadRequest(bucket,
-              key,
-              uploadId,
-              new ArrayList<>(partETags)));
-    }
-
-    /**
-     * Abort a multipart upload operation.
-     * @param uploadId multipart operation Id
-     * @throws AmazonClientException on problems.
-     */
-    void abortMultipartUpload(String uploadId) throws AmazonClientException {
-      LOG.debug("Aborting multipart upload {}", uploadId);
-      s3.abortMultipartUpload(
-          new AbortMultipartUploadRequest(bucket, key, uploadId));
-    }
-
-    /**
-     * Create and initialize a part request of a multipart upload.
-     * Exactly one of: {@code uploadStream} or {@code sourceFile}
-     * must be specified.
-     * @param uploadId ID of ongoing upload
-     * @param partNumber current part number of the upload
-     * @param size amount of data
-     * @param uploadStream source of data to upload
-     * @param sourceFile optional source file.
-     * @return the request.
-     */
-    UploadPartRequest newUploadPartRequest(String uploadId,
-        int partNumber, int size, InputStream uploadStream, File sourceFile) {
-      Preconditions.checkNotNull(uploadId);
-      // exactly one source must be set; xor verifies this
-      Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != 
null),
-          "Data source");
-      Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
-      Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000,
-          "partNumber must be between 1 and 10000 inclusive, but is %s",
-          partNumber);
-
-      LOG.debug("Creating part upload request for {} #{} size {}",
-          uploadId, partNumber, size);
-      UploadPartRequest request = new UploadPartRequest()
-          .withBucketName(bucket)
-          .withKey(key)
-          .withUploadId(uploadId)
-          .withPartNumber(partNumber)
-          .withPartSize(size);
-      if (uploadStream != null) {
-        // there's an upload stream. Bind to it.
-        request.setInputStream(uploadStream);
-      } else {
-        request.setFile(sourceFile);
-      }
-      return request;
-    }
-
-    /**
-     * The toString method is intended to be used in logging/toString calls.
-     * @return a string description.
-     */
-    @Override
-    public String toString() {
-      final StringBuilder sb = new StringBuilder(
-          "{bucket=").append(bucket);
-      sb.append(", key='").append(key).append('\'');
-      sb.append('}');
-      return sb.toString();
-    }
-
-    /**
-     * PUT an object directly (i.e. not via the transfer manager).
-     * @param putObjectRequest the request
-     * @return the upload initiated
-     * @throws IOException on problems
-     */
-    PutObjectResult putObject(PutObjectRequest putObjectRequest)
-        throws IOException {
-      try {
-        return putObjectDirect(putObjectRequest);
-      } catch (AmazonClientException e) {
-        throw translateException("put", putObjectRequest.getKey(), e);
-      }
+    return invoker.retry("listMultipartUploads", prefix, true,
+        () -> s3.listMultipartUploads(request).getMultipartUploads());
+  }
+
+  /**
+   * Abort a multipart upload.
+   * Retry policy: none.
+   * @param destKey destination key
+   * @param uploadId Upload ID
+   */
+  @Retries.OnceRaw
+  void abortMultipartUpload(String destKey, String uploadId) {
+    LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
+    getAmazonS3Client().abortMultipartUpload(
+        new AbortMultipartUploadRequest(getBucket(),
+            destKey,
+            uploadId));
+  }
+
+  /**
+   * Abort a multipart upload.
+   * Retry policy: none.
+   * @param upload the listed upload to abort.
+   */
+  @Retries.OnceRaw
+  void abortMultipartUpload(MultipartUpload upload) {
+    String destKey;
+    String uploadId;
+    destKey = upload.getKey();
+    uploadId = upload.getUploadId();
+    if (LOG.isInfoEnabled()) {
+      DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      LOG.info("Aborting multipart upload {} to {} initiated by {} on {}",
+          uploadId, destKey, upload.getInitiator(),
+          df.format(upload.getInitiated()));
+    }
+    getAmazonS3Client().abortMultipartUpload(
+        new AbortMultipartUploadRequest(getBucket(),
+            destKey,
+            uploadId));
+  }
+
+  /**
+   * Create a new instance of the committer statistics.
+   * @return a new committer statistics instance
+   */
+  public S3AInstrumentation.CommitterStatistics newCommitterStatistics() {
+    return instrumentation.newCommitterStatistics();
+  }
+
+  /**
+   * Return the capabilities of this filesystem instance.
+   * @param capability string to query the stream support for.
+   * @return whether the FS instance has the capability.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+
+    switch (capability.toLowerCase(Locale.ENGLISH)) {
+
+    case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
+      // capability depends on FS configuration
+      return isMagicCommitEnabled();
+
+    default:
+      return false;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 94d7701..7e6d640 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.google.common.base.Preconditions;
@@ -39,7 +39,6 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
  * The input stream for an S3A object.
@@ -86,6 +85,7 @@ public class S3AInputStream extends FSInputStream implements 
CanSetReadahead {
   private String serverSideEncryptionKey;
   private final S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
+  private final Invoker invoker;
 
   /**
    * This is the actual position within the object, used by
@@ -104,14 +104,29 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    */
   private long contentRangeStart;
 
+  /**
+   * Create the stream.
+   * This does not attempt to open it; that is only done on the first
+   * actual read() operation.
+   * @param s3Attributes object attributes from a HEAD request
+   * @param contentLength length of content
+   * @param client S3 client to use
+   * @param stats statistics to update
+   * @param instrumentation instrumentation to update
+   * @param readahead readahead bytes
+   * @param inputPolicy IO policy
+   * @param invoker preconfigured invoker
+   */
   public S3AInputStream(S3ObjectAttributes s3Attributes,
       long contentLength,
       AmazonS3 client,
       FileSystem.Statistics stats,
       S3AInstrumentation instrumentation,
       long readahead,
-      S3AInputPolicy inputPolicy) {
-    Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No 
Bucket");
+      S3AInputPolicy inputPolicy,
+      Invoker invoker) {
+    Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
+        "No Bucket");
     Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
     Preconditions.checkArgument(contentLength >= 0, "Negative content length");
     this.bucket = s3Attributes.getBucket();
@@ -126,6 +141,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
     this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
     this.inputPolicy = inputPolicy;
     setReadahead(readahead);
+    this.invoker = invoker;
   }
 
   /**
@@ -149,22 +165,22 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
         " streamPosition={}, nextReadPosition={}",
         uri, reason, targetPos, contentRangeFinish, length,  pos, nextReadPos);
 
-    streamStatistics.streamOpened();
-    try {
-      GetObjectRequest request = new GetObjectRequest(bucket, key)
-          .withRange(targetPos, contentRangeFinish - 1);
-      if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
-          StringUtils.isNotBlank(serverSideEncryptionKey)){
-        request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
-      }
-      wrappedStream = client.getObject(request).getObjectContent();
-      contentRangeStart = targetPos;
-      if (wrappedStream == null) {
-        throw new IOException("Null IO stream from reopen of (" + reason +  ") 
"
-            + uri);
-      }
-    } catch (AmazonClientException e) {
-      throw translateException("Reopen at position " + targetPos, uri, e);
+    long opencount = streamStatistics.streamOpened();
+    GetObjectRequest request = new GetObjectRequest(bucket, key)
+        .withRange(targetPos, contentRangeFinish - 1);
+    if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
+        StringUtils.isNotBlank(serverSideEncryptionKey)){
+      request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
+    }
+    String text = String.format("Failed to %s %s at %d",
+        (opencount == 0 ? "open" : "re-open"), uri, targetPos);
+    S3Object object = invoker.retry(text, uri, true,
+        () -> client.getObject(request));
+    wrappedStream = object.getObjectContent();
+    contentRangeStart = targetPos;
+    if (wrappedStream == null) {
+      throw new IOException("Null IO stream from reopen of (" + reason +  ") "
+          + uri);
     }
 
     this.pos = targetPos;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to