http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index da1fc5a..ef5a434 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -24,7 +24,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricStringBuilder;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
@@ -122,8 +127,23 @@ public class S3AInstrumentation {
       STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
       STREAM_WRITE_TOTAL_TIME,
       STREAM_WRITE_TOTAL_DATA,
+      COMMITTER_COMMITS_CREATED,
+      COMMITTER_COMMITS_COMPLETED,
+      COMMITTER_JOBS_SUCCEEDED,
+      COMMITTER_JOBS_FAILED,
+      COMMITTER_TASKS_SUCCEEDED,
+      COMMITTER_TASKS_FAILED,
+      COMMITTER_BYTES_COMMITTED,
+      COMMITTER_BYTES_UPLOADED,
+      COMMITTER_COMMITS_FAILED,
+      COMMITTER_COMMITS_ABORTED,
+      COMMITTER_COMMITS_REVERTED,
+      COMMITTER_MAGIC_FILES_CREATED,
       S3GUARD_METADATASTORE_PUT_PATH_REQUEST,
-      S3GUARD_METADATASTORE_INITIALIZATION
+      S3GUARD_METADATASTORE_INITIALIZATION,
+      S3GUARD_METADATASTORE_RETRY,
+      S3GUARD_METADATASTORE_THROTTLED,
+      STORE_IO_THROTTLED
   };
 
 
@@ -179,8 +199,11 @@ public class S3AInstrumentation {
       gauge(statistic.getSymbol(), statistic.getDescription());
     }
     //todo need a config for the quantiles interval?
+    int interval = 1;
     quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
-        "ops", "latency", 1);
+        "ops", "latency", interval);
+    quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
+        "events", "frequency (Hz)", interval);
   }
 
   /**
@@ -372,7 +395,7 @@ public class S3AInstrumentation {
   }
 
   /**
-   * Indicate that S3A deleted one or more file.s
+   * Indicate that S3A deleted one or more files.
    * @param count number of files.
    */
   public void fileDeleted(int count) {
@@ -506,6 +529,14 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Create a new instance of the committer statistics.
+   * @return a new committer statistics instance
+   */
+  CommitterStatistics newCommitterStatistics() {
+    return new CommitterStatistics();
+  }
+
+  /**
    * Merge in the statistics of a single input stream into
    * the filesystem-wide statistics.
    * @param statistics stream statistics
@@ -584,9 +615,12 @@ public class S3AInstrumentation {
 
     /**
      * The inner stream was opened.
+     * @return the previous count
      */
-    public void streamOpened() {
+    public long streamOpened() {
+      long count = openOperations;
       openOperations++;
+      return count;
     }
 
     /**
@@ -810,10 +844,13 @@ public class S3AInstrumentation {
     }
 
     /**
-     * Note an exception in a multipart complete.
+     * Note exception in a multipart complete.
+     * @param count count of exceptions
      */
-    void exceptionInMultipartComplete() {
-      exceptionsInMultipartFinalize.incrementAndGet();
+    void exceptionInMultipartComplete(int count) {
+      if (count > 0) {
+        exceptionsInMultipartFinalize.addAndGet(count);
+      }
     }
 
     /**
@@ -832,6 +869,15 @@ public class S3AInstrumentation {
     }
 
     /**
+     * Data has been uploaded to be committed in a subsequent operation;
+     * to be called at the end of the write.
+     * @param size size in bytes
+     */
+    public void commitUploaded(long size) {
+      incrementCounter(COMMITTER_BYTES_UPLOADED, size);
+    }
+
+    /**
      * Output stream has closed.
      * Trigger merge in of all statistics not updated during operation.
      */
@@ -918,5 +964,176 @@ public class S3AInstrumentation {
     public void storeClosed() {
 
     }
+
+    /**
+     * Throttled request.
+     */
+    public void throttled() {
+      incrementCounter(S3GUARD_METADATASTORE_THROTTLED, 1);
+      addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
+    }
+
+    /**
+     * S3Guard is retrying after a (retryable) failure.
+     */
+    public void retrying() {
+      incrementCounter(S3GUARD_METADATASTORE_RETRY, 1);
+    }
+  }
+
+  /**
+   * Instrumentation exported to S3Guard Committers.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public final class CommitterStatistics {
+
+    /** A commit has been created. */
+    public void commitCreated() {
+      incrementCounter(COMMITTER_COMMITS_CREATED, 1);
+    }
+
+    /**
+     * Data has been uploaded to be committed in a subsequent operation.
+     * @param size size in bytes
+     */
+    public void commitUploaded(long size) {
+      incrementCounter(COMMITTER_BYTES_UPLOADED, size);
+    }
+
+    /**
+     * A commit has been completed.
+     * @param size size in bytes
+     */
+    public void commitCompleted(long size) {
+      incrementCounter(COMMITTER_COMMITS_COMPLETED, 1);
+      incrementCounter(COMMITTER_BYTES_COMMITTED, size);
+    }
+
+    /** A commit has been aborted. */
+    public void commitAborted() {
+      incrementCounter(COMMITTER_COMMITS_ABORTED, 1);
+    }
+
+    public void commitReverted() {
+      incrementCounter(COMMITTER_COMMITS_REVERTED, 1);
+    }
+
+    public void commitFailed() {
+      incrementCounter(COMMITTER_COMMITS_FAILED, 1);
+    }
+
+    public void taskCompleted(boolean success) {
+      incrementCounter(
+          success ? COMMITTER_TASKS_SUCCEEDED
+              : COMMITTER_TASKS_FAILED,
+          1);
+    }
+
+    public void jobCompleted(boolean success) {
+      incrementCounter(
+          success ? COMMITTER_JOBS_SUCCEEDED
+              : COMMITTER_JOBS_FAILED,
+          1);
+    }
+  }
+
+  /**
+   * Copy all the metrics to a map of (name, long-value).
+   * @return a map of the metrics
+   */
+  public Map<String, Long> toMap() {
+    MetricsToMap metricBuilder = new MetricsToMap(null);
+    registry.snapshot(metricBuilder, true);
+    for (Map.Entry<String, MutableCounterLong> entry :
+        streamMetrics.entrySet()) {
+      metricBuilder.tuple(entry.getKey(), entry.getValue().value());
+    }
+    return metricBuilder.getMap();
+  }
+
+  /**
+   * Convert all metrics to a map.
+   */
+  private static class MetricsToMap extends MetricsRecordBuilder {
+    private final MetricsCollector parent;
+    private final Map<String, Long> map =
+        new HashMap<>(COUNTERS_TO_CREATE.length * 2);
+
+    MetricsToMap(MetricsCollector parent) {
+      this.parent = parent;
+    }
+
+    @Override
+    public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder add(MetricsTag tag) {
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder add(AbstractMetric metric) {
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder setContext(String value) {
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+      return tuple(info, value);
+    }
+
+    @Override
+    public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+      return tuple(info, value);
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+      return tuple(info, value);
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+      return tuple(info, value);
+    }
+
+    public MetricsToMap tuple(MetricsInfo info, long value) {
+      return tuple(info.name(), value);
+    }
+
+    public MetricsToMap tuple(String name, long value) {
+      map.put(name, value);
+      return this;
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+      return tuple(info, (long) value);
+    }
+
+    @Override
+    public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+      return tuple(info, (long) value);
+    }
+
+    @Override
+    public MetricsCollector parent() {
+      return parent;
+    }
+
+    /**
+     * Get the map.
+     * @return the map of metrics
+     */
+    public Map<String, Long> getMap() {
+      return map;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
new file mode 100644
index 0000000..e37a554
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.file.AccessDeniedException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonClientException;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.ConnectTimeoutException;
+
+import static org.apache.hadoop.io.retry.RetryPolicies.*;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * The S3A request retry policy.
+ *
+ * This uses the retry options in the configuration file to determine retry
+ * count and delays for "normal" retries and separately, for throttling;
+ * the latter is best handled for longer with an exponential back-off.
+ *
+ * <ol>
+ * <li> Those exceptions considered unrecoverable (networking) are
+ *    failed fast.</li>
+ * <li>All non-IOEs are failed immediately. Assumed: bugs in code,
+ *    unrecoverable errors, etc</li>
+ * </ol>
+ *
+ * For non-idempotent operations, only failures due to throttling or
+ * from failures which are known to only arise prior to talking to S3
+ * are retried.
+ *
+ * The retry policy is all built around that of the normal IO exceptions,
+ * particularly those extracted from
+ * {@link S3AUtils#translateException(String, Path, AmazonClientException)}.
+ * Because the {@link #shouldRetry(Exception, int, int, boolean)} method
+ * does this translation if an {@code AmazonClientException} is processed,
+ * the policy defined for the IOEs also applies to the original exceptions.
+ *
+ * Put differently: this retry policy aims to work for handlers of the
+ * untranslated exceptions, as well as the translated ones.
+ * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html";>S3 
Error responses</a>
+ * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html";>Amazon
 S3 Error Best Practices</a>
+ * @see <a 
href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html";>Dynamo
 DB Commmon errors</a>
+ */
+public class S3ARetryPolicy implements RetryPolicy {
+
+  private final RetryPolicy retryPolicy;
+
+  /**
+   * Instantiate.
+   * @param conf configuration to read.
+   */
+  public S3ARetryPolicy(Configuration conf) {
+    Preconditions.checkArgument(conf != null, "Null configuration");
+
+    // base policy from configuration
+    RetryPolicy fixedRetries = retryUpToMaximumCountWithFixedSleep(
+        conf.getInt(RETRY_LIMIT, RETRY_LIMIT_DEFAULT),
+        conf.getTimeDuration(RETRY_INTERVAL,
+            RETRY_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+        TimeUnit.MILLISECONDS);
+
+    // which is wrapped by a rejection of all non-idempotent calls except
+    // for specific failures.
+    RetryPolicy retryIdempotentCalls = new FailNonIOEs(
+        new IdempotencyRetryFilter(fixedRetries));
+
+    // and a separate policy for throttle requests, which are considered
+    // repeatable, even for non-idempotent calls, as the service
+    // rejected the call entirely
+    RetryPolicy throttlePolicy = exponentialBackoffRetry(
+        conf.getInt(RETRY_THROTTLE_LIMIT, RETRY_THROTTLE_LIMIT_DEFAULT),
+        conf.getTimeDuration(RETRY_THROTTLE_INTERVAL,
+            RETRY_THROTTLE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS),
+        TimeUnit.MILLISECONDS);
+
+    // no retry on network and tangible API issues
+    RetryPolicy fail = RetryPolicies.TRY_ONCE_THEN_FAIL;
+
+    // client connectivity: fixed retries without care for idempotency
+    RetryPolicy connectivityFailure = fixedRetries;
+
+    // the policy map maps the exact classname; subclasses do not
+    // inherit policies.
+    Map<Class<? extends Exception>, RetryPolicy> policyMap = new HashMap<>();
+
+    // failfast exceptions which we consider unrecoverable
+    policyMap.put(UnknownHostException.class, fail);
+    policyMap.put(NoRouteToHostException.class, fail);
+    policyMap.put(InterruptedException.class, fail);
+    // note this does not pick up subclasses (like socket timeout)
+    policyMap.put(InterruptedIOException.class, fail);
+    policyMap.put(AWSRedirectException.class, fail);
+    // interesting question: should this be retried ever?
+    policyMap.put(AccessDeniedException.class, fail);
+    policyMap.put(FileNotFoundException.class, fail);
+    policyMap.put(InvalidRequestException.class, fail);
+
+    // should really be handled by resubmitting to new location;
+    // that's beyond the scope of this retry policy
+    policyMap.put(AWSRedirectException.class, fail);
+
+    // throttled requests are can be retried, always
+    policyMap.put(AWSServiceThrottledException.class, throttlePolicy);
+
+    // connectivity problems are retried without worrying about idempotency
+    policyMap.put(ConnectTimeoutException.class, connectivityFailure);
+
+    // this can be a sign of an HTTP connection breaking early.
+    // which can be reacted to by another attempt if the request was 
idempotent.
+    // But: could also be a sign of trying to read past the EOF on a GET,
+    // which isn't going to be recovered from
+    policyMap.put(EOFException.class, retryIdempotentCalls);
+
+    // policy on a 400/bad request still ambiguous. Given it
+    // comes and goes on test runs: try again
+    policyMap.put(AWSBadRequestException.class, connectivityFailure);
+
+    // Status 500 error code is also treated as a connectivity problem
+    policyMap.put(AWSStatus500Exception.class, connectivityFailure);
+
+    // server didn't respond.
+    policyMap.put(AWSNoResponseException.class, retryIdempotentCalls);
+
+    // other operations
+    policyMap.put(AWSClientIOException.class, retryIdempotentCalls);
+    policyMap.put(AWSServiceIOException.class, retryIdempotentCalls);
+    policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
+    policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
+
+    // Dynamo DB exceptions
+    // asking for more than you should get. It's a retry but should be logged
+    // trigger sleep
+    policyMap.put(ProvisionedThroughputExceededException.class, 
throttlePolicy);
+
+    retryPolicy = retryByException(retryIdempotentCalls, policyMap);
+  }
+
+  @Override
+  public RetryAction shouldRetry(Exception exception,
+      int retries,
+      int failovers,
+      boolean idempotent) throws Exception {
+    Exception ex = exception;
+    if (exception instanceof AmazonClientException) {
+      // uprate the amazon client exception for the purpose of exception
+      // processing.
+      ex = S3AUtils.translateException("", "",
+          (AmazonClientException) exception);
+    }
+    return retryPolicy.shouldRetry(ex, retries, failovers, idempotent);
+  }
+
+  /**
+   * Policy which fails fast any non-idempotent call; hands off
+   * all idempotent calls to the next retry policy.
+   */
+  private static final class IdempotencyRetryFilter implements RetryPolicy {
+
+    private final RetryPolicy next;
+
+    IdempotencyRetryFilter(RetryPolicy next) {
+      this.next = next;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e,
+        int retries,
+        int failovers,
+        boolean idempotent) throws Exception {
+      return
+          idempotent ?
+              next.shouldRetry(e, retries, failovers, true)
+              : RetryAction.FAIL;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "IdempotencyRetryFilter{");
+      sb.append("next=").append(next);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * All non-IOE exceptions are failed.
+   */
+  private static final class FailNonIOEs implements RetryPolicy {
+
+    private final RetryPolicy next;
+
+    private FailNonIOEs(RetryPolicy next) {
+      this.next = next;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e,
+        int retries,
+        int failovers,
+        boolean isIdempotentOrAtMostOnce) throws Exception {
+      return
+          e instanceof IOException ?
+              next.shouldRetry(e, retries, failovers, true)
+              : RetryAction.FAIL;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
index c1cf7cf..4b12667 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.EnumMap;
@@ -35,8 +36,10 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class S3AStorageStatistics extends StorageStatistics {
-  private static final Logger LOG = S3AFileSystem.LOG;
+public class S3AStorageStatistics extends StorageStatistics
+    implements Iterable<StorageStatistics.LongStatistic> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AStorageStatistics.class);
 
   public static final String NAME = "S3AStorageStatistics";
   private final Map<Statistic, AtomicLong> opsCount =
@@ -97,6 +100,11 @@ public class S3AStorageStatistics extends StorageStatistics 
{
   }
 
   @Override
+  public Iterator<LongStatistic> iterator() {
+    return getLongStatistics();
+  }
+
+  @Override
   public Long getLong(String key) {
     final Statistic type = Statistic.fromSymbol(key);
     return type == null ? null : opsCount.get(type).get();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 6e6f4b6..70926e6 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -18,11 +18,17 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.amazonaws.AbortedException;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkBaseException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
+import com.amazonaws.services.dynamodbv2.model.LimitExceededException;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 
@@ -32,12 +38,18 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.ProviderUtils;
 
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -46,12 +58,15 @@ import java.io.InterruptedIOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.nio.file.AccessDeniedException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
@@ -63,8 +78,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 @InterfaceStability.Evolving
 public final class S3AUtils {
 
-  /** Reuse the S3AFileSystem log. */
-  private static final Logger LOG = S3AFileSystem.LOG;
+  private static final Logger LOG = LoggerFactory.getLogger(S3AUtils.class);
   static final String CONSTRUCTOR_EXCEPTION = "constructor exception";
   static final String INSTANTIATION_EXCEPTION
       = "instantiation exception";
@@ -95,6 +109,8 @@ public final class S3AUtils {
       S3AEncryptionMethods.SSE_S3.getMethod()
           + " is enabled but an encryption key was set in "
           + SERVER_SIDE_ENCRYPTION_KEY;
+  private static final String EOF_MESSAGE_IN_XML_PARSER
+      = "Failed to sanitize XML document destined for handler class";
 
 
   private S3AUtils() {
@@ -106,6 +122,10 @@ public final class S3AUtils {
    * {@link AmazonClientException} passed in, and any status codes included
    * in the operation. That is: HTTP error codes are examined and can be
    * used to build a more specific response.
+   *
+   * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html";>S3 
Error responses</a>
+   * @see <a 
href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html";>Amazon
 S3 Error Best Practices</a>
+   * @see <a 
href="http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/CommonErrors.html";>Dynamo
 DB Commmon errors</a>
    * @param operation operation
    * @param path path operated on (must not be null)
    * @param exception amazon exception raised
@@ -131,19 +151,28 @@ public final class S3AUtils {
   @SuppressWarnings("ThrowableInstanceNeverThrown")
   public static IOException translateException(String operation,
       String path,
-      AmazonClientException exception) {
+      SdkBaseException exception) {
     String message = String.format("%s%s: %s",
         operation,
         path != null ? (" on " + path) : "",
         exception);
     if (!(exception instanceof AmazonServiceException)) {
-      if (containsInterruptedException(exception)) {
-        return (IOException)new InterruptedIOException(message)
-            .initCause(exception);
+      Exception innerCause = containsInterruptedException(exception);
+      if (innerCause != null) {
+        // interrupted IO, or a socket exception underneath that class
+        return translateInterruptedException(exception, innerCause, message);
+      }
+      if (signifiesConnectionBroken(exception)) {
+        // call considered an sign of connectivity failure
+        return (EOFException)new EOFException(message).initCause(exception);
       }
       return new AWSClientIOException(message, exception);
     } else {
-
+      if (exception instanceof AmazonDynamoDBException) {
+        // special handling for dynamo DB exceptions
+        return translateDynamoDBException(message,
+            (AmazonDynamoDBException)exception);
+      }
       IOException ioe;
       AmazonServiceException ase = (AmazonServiceException) exception;
       // this exception is non-null if the service exception is an s3 one
@@ -151,9 +180,11 @@ public final class S3AUtils {
           ? (AmazonS3Exception) ase
           : null;
       int status = ase.getStatusCode();
+      message = message + ":" + ase.getErrorCode();
       switch (status) {
 
       case 301:
+      case 307:
         if (s3Exception != null) {
           if (s3Exception.getAdditionalDetails() != null &&
               s3Exception.getAdditionalDetails().containsKey(ENDPOINT_KEY)) {
@@ -163,11 +194,16 @@ public final class S3AUtils {
                 + "the bucket.",
                 s3Exception.getAdditionalDetails().get(ENDPOINT_KEY), 
ENDPOINT);
           }
-          ioe = new AWSS3IOException(message, s3Exception);
+          ioe = new AWSRedirectException(message, s3Exception);
         } else {
-          ioe = new AWSServiceIOException(message, ase);
+          ioe = new AWSRedirectException(message, ase);
         }
         break;
+
+      case 400:
+        ioe = new AWSBadRequestException(message, ase);
+        break;
+
       // permissions
       case 401:
       case 403:
@@ -186,6 +222,25 @@ public final class S3AUtils {
       // a shorter one while it is being read.
       case 416:
         ioe = new EOFException(message);
+        ioe.initCause(ase);
+        break;
+
+      // this has surfaced as a "no response from server" message.
+      // so rare we haven't replicated it.
+      // Treating as an idempotent proxy error.
+      case 443:
+      case 444:
+        ioe = new AWSNoResponseException(message, ase);
+        break;
+
+      // throttling
+      case 503:
+        ioe = new AWSServiceThrottledException(message, ase);
+        break;
+
+      // internal error
+      case 500:
+        ioe = new AWSStatus500Exception(message, ase);
         break;
 
       default:
@@ -226,21 +281,99 @@ public final class S3AUtils {
    * Recurse down the exception loop looking for any inner details about
    * an interrupted exception.
    * @param thrown exception thrown
-   * @return true if down the execution chain the operation was an interrupt
+   * @return the actual exception if the operation was an interrupt
    */
-  static boolean containsInterruptedException(Throwable thrown) {
+  static Exception containsInterruptedException(Throwable thrown) {
     if (thrown == null) {
-      return false;
+      return null;
     }
     if (thrown instanceof InterruptedException ||
-        thrown instanceof InterruptedIOException) {
-      return true;
+        thrown instanceof InterruptedIOException ||
+        thrown instanceof AbortedException) {
+      return (Exception)thrown;
     }
     // tail recurse
     return containsInterruptedException(thrown.getCause());
   }
 
   /**
+   * Handles translation of interrupted exception. This includes
+   * preserving the class of the fault for better retry logic
+   * @param exception outer exception
+   * @param innerCause inner cause (which is guaranteed to be some form
+   * of interrupted exception
+   * @param message message for the new exception.
+   * @return an IOE which can be rethrown
+   */
+  private static InterruptedIOException translateInterruptedException(
+      SdkBaseException exception,
+      final Exception innerCause,
+      String message) {
+    InterruptedIOException ioe;
+    if (innerCause instanceof SocketTimeoutException) {
+      ioe = new SocketTimeoutException(message);
+    } else {
+      String name = innerCause.getClass().getName();
+      if (name.endsWith(".ConnectTimeoutException")
+          || name.endsWith("$ConnectTimeoutException")) {
+        // TCP connection http timeout from the shaded or unshaded filenames
+        // com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException
+        ioe = new ConnectTimeoutException(message);
+      } else {
+        // any other exception
+        ioe = new InterruptedIOException(message);
+      }
+    }
+    ioe.initCause(exception);
+    return ioe;
+  }
+
+  /**
+   * Is the exception an instance of a throttling exception. That
+   * is an AmazonServiceException with a 503 response, any
+   * exception from DynamoDB for limits exceeded, or an
+   * {@link AWSServiceThrottledException}.
+   * @param ex exception to examine
+   * @return true if it is considered a throttling exception
+   */
+  public static boolean isThrottleException(Exception ex) {
+    return ex instanceof AWSServiceThrottledException
+        || ex instanceof ProvisionedThroughputExceededException
+        || ex instanceof LimitExceededException
+        || (ex instanceof AmazonServiceException
+            && 503  == ((AmazonServiceException)ex).getStatusCode());
+  }
+
+  /**
+   * Cue that an AWS exception is likely to be an EOF Exception based
+   * on the message coming back from an XML/JSON parser. This is likely
+   * to be brittle, so only a hint.
+   * @param ex exception
+   * @return true if this is believed to be a sign the connection was broken.
+   */
+  public static boolean signifiesConnectionBroken(SdkBaseException ex) {
+    return ex.toString().contains(EOF_MESSAGE_IN_XML_PARSER);
+  }
+
+  /**
+   * Translate a DynamoDB exception into an IOException.
+   * @param message preformatted message for the exception
+   * @param ex exception
+   * @return an exception to throw.
+   */
+  public static IOException translateDynamoDBException(String message,
+      AmazonDynamoDBException ex) {
+    if (isThrottleException(ex)) {
+      return new AWSServiceThrottledException(message, ex);
+    }
+    if (ex instanceof ResourceNotFoundException) {
+      return (FileNotFoundException) new FileNotFoundException(message)
+          .initCause(ex);
+    }
+    return new AWSServiceIOException(message, ex);
+  }
+
+  /**
    * Get low level details of an amazon exception for logging; multi-line.
    * @param e exception
    * @return string details
@@ -587,7 +720,7 @@ public final class S3AUtils {
   }
 
   /**
-   * Get a long option >= the minimum allowed value, supporting memory
+   * Get a long option &gt;= the minimum allowed value, supporting memory
    * prefixes K,M,G,T,P.
    * @param conf configuration
    * @param key key to look up
@@ -596,7 +729,7 @@ public final class S3AUtils {
    * @return the value
    * @throws IllegalArgumentException if the value is below the minimum
    */
-  static long longBytesOption(Configuration conf,
+  public static long longBytesOption(Configuration conf,
                              String key,
                              long defVal,
                              long min) {
@@ -746,6 +879,133 @@ public final class S3AUtils {
     return dest;
   }
 
+
+  /**
+   * Delete a path quietly: failures are logged at DEBUG.
+   * @param fs filesystem
+   * @param path path
+   * @param recursive recursive?
+   */
+  public static void deleteQuietly(FileSystem fs,
+      Path path,
+      boolean recursive) {
+    try {
+      fs.delete(path, recursive);
+    } catch (IOException e) {
+      LOG.debug("Failed to delete {}", path, e);
+    }
+  }
+
+  /**
+   * Delete a path: failures are logged at WARN.
+   * @param fs filesystem
+   * @param path path
+   * @param recursive recursive?
+   */
+  public static void deleteWithWarning(FileSystem fs,
+      Path path,
+      boolean recursive) {
+    try {
+      fs.delete(path, recursive);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete {}", path, e);
+    }
+  }
+
+
+  /**
+   * An interface for use in lambda-expressions working with
+   * directory tree listings.
+   */
+  @FunctionalInterface
+  public interface CallOnLocatedFileStatus {
+    void call(LocatedFileStatus status) throws IOException;
+  }
+
+  /**
+   * An interface for use in lambda-expressions working with
+   * directory tree listings.
+   */
+  @FunctionalInterface
+  public interface LocatedFileStatusMap<T> {
+    T call(LocatedFileStatus status) throws IOException;
+  }
+
+  /**
+   * Apply an operation to every {@link LocatedFileStatus} in a remote
+   * iterator.
+   * @param iterator iterator from a list
+   * @param eval closure to evaluate
+   * @throws IOException anything in the closure, or iteration logic.
+   */
+  public static void applyLocatedFiles(
+      RemoteIterator<LocatedFileStatus> iterator,
+      CallOnLocatedFileStatus eval) throws IOException {
+    while (iterator.hasNext()) {
+      eval.call(iterator.next());
+    }
+  }
+
+  /**
+   * Map an operation to every {@link LocatedFileStatus} in a remote
+   * iterator, returning a list of the results.
+   * @param iterator iterator from a list
+   * @param eval closure to evaluate
+   * @throws IOException anything in the closure, or iteration logic.
+   */
+  public static <T> List<T> mapLocatedFiles(
+      RemoteIterator<LocatedFileStatus> iterator,
+      LocatedFileStatusMap<T> eval) throws IOException {
+    final List<T> results = new ArrayList<>();
+    applyLocatedFiles(iterator,
+        (s) -> results.add(eval.call(s)));
+    return results;
+  }
+
+  /**
+   * Map an operation to every {@link LocatedFileStatus} in a remote
+   * iterator, returning a list of the all results which were not empty.
+   * @param iterator iterator from a list
+   * @param eval closure to evaluate
+   * @throws IOException anything in the closure, or iteration logic.
+   */
+  public static <T> List<T> flatmapLocatedFiles(
+      RemoteIterator<LocatedFileStatus> iterator,
+      LocatedFileStatusMap<Optional<T>> eval) throws IOException {
+    final List<T> results = new ArrayList<>();
+    applyLocatedFiles(iterator,
+        (s) -> eval.call(s).map(r -> results.add(r)));
+    return results;
+  }
+
+  /**
+   * List located files and filter them as a classic listFiles(path, filter)
+   * would do.
+   * @param fileSystem filesystem
+   * @param path path to list
+   * @param recursive recursive listing?
+   * @param filter filter for the filename
+   * @return the filtered list of entries
+   * @throws IOException IO failure.
+   */
+  public static List<LocatedFileStatus> listAndFilter(FileSystem fileSystem,
+      Path path, boolean recursive, PathFilter filter) throws IOException {
+    return flatmapLocatedFiles(fileSystem.listFiles(path, recursive),
+        status -> maybe(filter.accept(status.getPath()), status));
+  }
+
+  /**
+   * Convert a value into a non-empty Optional instance if
+   * the value of {@code include} is true.
+   * @param include flag to indicate the value is to be included.
+   * @param value value to return
+   * @param <T> type of option.
+   * @return if include is false, Optional.empty. Otherwise, the value.
+   */
+  public static <T> Optional<T> maybe(boolean include, T value) {
+    return include ? Optional.of(value) : Optional.empty();
+  }
+
   /**
    * Patch the security credential provider information in
    * {@link #CREDENTIAL_PROVIDER_PATH}
@@ -937,4 +1197,36 @@ public final class S3AUtils {
     return conf.get(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey);
   }
 
+
+  /**
+   * Path filter which ignores any file which starts with . or _.
+   */
+  public static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+
+    @Override
+    public String toString() {
+      return "HIDDEN_FILE_FILTER";
+    }
+  };
+
+  /**
+   * A Path filter which accepts all filenames.
+   */
+  public static final PathFilter ACCEPT_ALL = new PathFilter() {
+    @Override
+    public boolean accept(Path file) {
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "ACCEPT_ALL";
+    }
+  };
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
index 6b3bd46..1a0d2c3 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
@@ -66,4 +66,15 @@ public class S3ListRequest {
   public ListObjectsV2Request getV2() {
     return v2Request;
   }
+
+  @Override
+  public String toString() {
+    if (isV1()) {
+      return String.format("List %s:/%s",
+          v1Request.getBucketName(), v1Request.getPrefix());
+    } else {
+      return String.format("List %s:/%s",
+          v2Request.getBucketName(), v2Request.getPrefix());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
index 7c73a23..d1bff8a 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -30,7 +30,7 @@ class S3ObjectAttributes {
   private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private String serverSideEncryptionKey;
 
-  public S3ObjectAttributes(
+  S3ObjectAttributes(
       String bucket,
       String key,
       S3AEncryptionMethods serverSideEncryptionAlgorithm,
@@ -41,19 +41,19 @@ class S3ObjectAttributes {
     this.serverSideEncryptionKey = serverSideEncryptionKey;
   }
 
-  public String getBucket() {
+  String getBucket() {
     return bucket;
   }
 
-  public String getKey() {
+  String getKey() {
     return key;
   }
 
-  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
+  S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
     return serverSideEncryptionAlgorithm;
   }
 
-  public String getServerSideEncryptionKey() {
+  String getServerSideEncryptionKey() {
     return serverSideEncryptionKey;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 777c161..871d7c5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -77,6 +77,8 @@ public enum Statistic {
       "Number of continued object listings made"),
   OBJECT_METADATA_REQUESTS("object_metadata_requests",
       "Number of requests for object metadata"),
+  OBJECT_MULTIPART_UPLOAD_INITIATED("object_multipart_initiated",
+      "Object multipart upload initiated"),
   OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted",
       "Object multipart upload aborted"),
   OBJECT_PUT_REQUESTS("object_put_requests",
@@ -142,16 +144,62 @@ public enum Statistic {
   STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
       "Total queue duration of all block uploads"),
 
-  // S3Guard stats
+  // S3guard committer stats
+  COMMITTER_COMMITS_CREATED(
+      "committer_commits_created",
+      "Number of files to commit created"),
+  COMMITTER_COMMITS_COMPLETED(
+      "committer_commits_completed",
+      "Number of files committed"),
+  COMMITTER_JOBS_SUCCEEDED(
+      "committer_jobs_completed",
+      "Number of successful jobs"),
+  COMMITTER_JOBS_FAILED(
+      "committer_jobs_failed",
+      "Number of failed jobs"),
+  COMMITTER_TASKS_SUCCEEDED(
+      "committer_tasks_completed",
+      "Number of successful tasks"),
+  COMMITTER_TASKS_FAILED(
+      "committer_tasks_failed",
+      "Number of failed tasks"),
+  COMMITTER_BYTES_COMMITTED(
+      "committer_bytes_committed",
+      "Amount of data committed"),
+  COMMITTER_BYTES_UPLOADED(
+      "committer_bytes_uploaded",
+      "Number of bytes uploaded duing commit operations"),
+  COMMITTER_COMMITS_FAILED(
+      "committer_commits_failed",
+      "Number of commits failed"),
+  COMMITTER_COMMITS_ABORTED(
+      "committer_commits_aborted",
+      "Number of commits aborted"),
+  COMMITTER_COMMITS_REVERTED(
+      "committer_commits_reverted",
+      "Number of commits reverted"),
+  COMMITTER_MAGIC_FILES_CREATED(
+      "committer_magic_files_created",
+      "Number of files created under 'magic' paths"),
+
+  // S3guard stats
   S3GUARD_METADATASTORE_PUT_PATH_REQUEST(
       "s3guard_metadatastore_put_path_request",
-      "s3guard metadata store put one metadata path request"),
+      "S3Guard metadata store put one metadata path request"),
   S3GUARD_METADATASTORE_PUT_PATH_LATENCY(
       "s3guard_metadatastore_put_path_latency",
-      "s3guard metadata store put one metadata path lantency"),
+      "S3Guard metadata store put one metadata path latency"),
   S3GUARD_METADATASTORE_INITIALIZATION("s3guard_metadatastore_initialization",
-      "s3guard metadata store initialization times");
+      "S3Guard metadata store initialization times"),
+  S3GUARD_METADATASTORE_RETRY("s3guard_metadatastore_retry",
+      "S3Guard metadata store retry events"),
+  S3GUARD_METADATASTORE_THROTTLED("s3guard_metadatastore_throttled",
+      "S3Guard metadata store throttled events"),
+  S3GUARD_METADATASTORE_THROTTLE_RATE(
+      "s3guard_metadatastore_throttle_rate",
+      "S3Guard metadata store throttle rate"),
 
+  STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried");
 
   private static final Map<String, Statistic> SYMBOL_MAP =
       new HashMap<>(Statistic.values().length);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
new file mode 100644
index 0000000..b3dd4e2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.amazonaws.services.s3.transfer.model.UploadResult;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.s3a.Invoker.*;
+
+/**
+ * Helper for low-level operations against an S3 Bucket for writing data
+ * and creating and committing pending writes.
+ * <p>
+ * It hides direct access to the S3 API
+ * and is a location where the object upload process can be evolved/enhanced.
+ * <p>
+ * Features
+ * <ul>
+ *   <li>Methods to create and submit requests to S3, so avoiding
+ *   all direct interaction with the AWS APIs.</li>
+ *   <li>Some extra preflight checks of arguments, so failing fast on
+ *   errors.</li>
+ *   <li>Callbacks to let the FS know of events in the output stream
+ *   upload process.</li>
+ *   <li>Failure handling, including converting exceptions to IOEs.</li>
+ *   <li>Integration with instrumentation and S3Guard.</li>
+ * </ul>
+ *
+ * This API is for internal use only.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class WriteOperationHelper {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WriteOperationHelper.class);
+  private final S3AFileSystem owner;
+  private final Invoker invoker;
+
+  /**
+   * Constructor.
+   * @param owner owner FS creating the helper
+   *
+   */
+  protected WriteOperationHelper(S3AFileSystem owner) {
+    this.owner = owner;
+    this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()),
+        this::operationRetried);
+  }
+
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param retries number of retries
+   * @param idempotent is the method idempotent
+   */
+  void operationRetried(String text, Exception ex, int retries,
+      boolean idempotent) {
+    owner.operationRetried(text, ex, retries, idempotent);
+  }
+
+  /**
+   * Execute a function with retry processing.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  public <T> T retry(String action,
+      String path,
+      boolean idempotent,
+      Invoker.Operation<T> operation)
+      throws IOException {
+
+    return invoker.retry(action, path, idempotent, operation);
+  }
+
+  /**
+   * Create a {@link PutObjectRequest} request against the specific key.
+   * @param destKey destination key
+   * @param inputStream source data.
+   * @param length size, if known. Use -1 for not known
+   * @return the request
+   */
+  public PutObjectRequest createPutObjectRequest(String destKey,
+      InputStream inputStream, long length) {
+    return owner.newPutObjectRequest(destKey,
+        newObjectMetadata(length),
+        inputStream);
+  }
+
+  /**
+   * Create a {@link PutObjectRequest} request to upload a file.
+   * @param dest key to PUT to.
+   * @param sourceFile source file
+   * @return the request
+   */
+  public PutObjectRequest createPutObjectRequest(String dest,
+      File sourceFile) {
+    Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
+        "File length is too big for a single PUT upload");
+    return owner.newPutObjectRequest(dest,
+        newObjectMetadata((int) sourceFile.length()),
+        sourceFile);
+  }
+
+  /**
+   * Callback on a successful write.
+   * @param length length of the write
+   */
+  public void writeSuccessful(long length) {
+  }
+
+  /**
+   * Callback on a write failure.
+   * @param ex Any exception raised which triggered the failure.
+   */
+  public void writeFailed(Exception ex) {
+    LOG.debug("Write to {} failed", this, ex);
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   * @param length size, if known. Use -1 for not known
+   * @return a new metadata instance
+   */
+  public ObjectMetadata newObjectMetadata(long length) {
+    return owner.newObjectMetadata(length);
+  }
+
+  /**
+   * Start the multipart upload process.
+   * Retry policy: retrying, translated.
+   * @return the upload result containing the ID
+   * @throws IOException IO problem
+   */
+  @Retries.RetryTranslated
+  public String initiateMultiPartUpload(String destKey) throws IOException {
+    LOG.debug("Initiating Multipart upload to {}", destKey);
+    final InitiateMultipartUploadRequest initiateMPURequest =
+        new InitiateMultipartUploadRequest(owner.getBucket(),
+            destKey,
+            newObjectMetadata(-1));
+    initiateMPURequest.setCannedACL(owner.getCannedACL());
+    owner.setOptionalMultipartUploadRequestParameters(initiateMPURequest);
+
+    return retry("initiate MultiPartUpload", destKey, true,
+        () -> owner.initiateMultipartUpload(initiateMPURequest).getUploadId());
+  }
+
+  /**
+   * Finalize a multipart PUT operation.
+   * This completes the upload, and, if that works, calls
+   * {@link S3AFileSystem#finishedWrite(String, long)} to update the 
filesystem.
+   * Retry policy: retrying, translated.
+   * @param destKey destination of the commit
+   * @param uploadId multipart operation Id
+   * @param partETags list of partial uploads
+   * @param length length of the upload
+   * @param retrying retrying callback
+   * @return the result of the operation.
+   * @throws IOException on problems.
+   */
+  @Retries.RetryTranslated
+  private CompleteMultipartUploadResult finalizeMultipartUpload(
+      String destKey,
+      String uploadId,
+      List<PartETag> partETags,
+      long length,
+      Retried retrying) throws IOException {
+    return invoker.retry("Completing multipart commit", destKey,
+        true,
+        retrying,
+        () -> {
+          // a copy of the list is required, so that the AWS SDK doesn't
+          // attempt to sort an unmodifiable list.
+          CompleteMultipartUploadResult result =
+              owner.getAmazonS3Client().completeMultipartUpload(
+                  new CompleteMultipartUploadRequest(owner.getBucket(),
+                      destKey,
+                      uploadId,
+                      new ArrayList<>(partETags)));
+          owner.finishedWrite(destKey, length);
+          return result;
+        }
+    );
+  }
+
+  /**
+   * This completes a multipart upload to the destination key via
+   * {@code finalizeMultipartUpload()}.
+   * Retry policy: retrying, translated.
+   * Retries increment the {@code errorCount} counter.
+   * @param destKey destination
+   * @param uploadId multipart operation Id
+   * @param partETags list of partial uploads
+   * @param length length of the upload
+   * @param errorCount a counter incremented by 1 on every error; for
+   * use in statistics
+   * @return the result of the operation.
+   * @throws IOException if problems arose which could not be retried, or
+   * the retry count was exceeded
+   */
+  @Retries.RetryTranslated
+  public CompleteMultipartUploadResult completeMPUwithRetries(
+      String destKey,
+      String uploadId,
+      List<PartETag> partETags,
+      long length,
+      AtomicInteger errorCount)
+      throws IOException {
+    checkNotNull(uploadId);
+    checkNotNull(partETags);
+    LOG.debug("Completing multipart upload {} with {} parts",
+        uploadId, partETags.size());
+    return finalizeMultipartUpload(destKey,
+        uploadId,
+        partETags,
+        length,
+        (text, e, r, i) -> errorCount.incrementAndGet());
+  }
+
+  /**
+   * Abort a multipart upload operation.
+   * @param destKey destination key of the upload
+   * @param uploadId multipart operation Id
+   * @param retrying callback invoked on every retry
+   * @throws IOException failure to abort
+   * @throws FileNotFoundException if the abort ID is unknown
+   */
+  @Retries.RetryTranslated
+  public void abortMultipartUpload(String destKey, String uploadId,
+      Retried retrying)
+      throws IOException {
+    invoker.retry("Aborting multipart upload", destKey, true,
+        retrying,
+        () -> owner.abortMultipartUpload(
+            destKey,
+            uploadId));
+  }
+
+  /**
+   * Abort a multipart commit operation.
+   * @param upload upload to abort.
+   * @throws IOException on problems.
+   */
+  @Retries.RetryTranslated
+  public void abortMultipartUpload(MultipartUpload upload)
+      throws IOException {
+    invoker.retry("Aborting multipart commit", upload.getKey(), true,
+        () -> owner.abortMultipartUpload(upload));
+  }
+
+
+  /**
+   * Abort multipart uploads under a path: limited to the first
+   * few hundred.
+   * @param prefix prefix for uploads to abort
+   * @return a count of aborts
+   * @throws IOException trouble; FileNotFoundExceptions are swallowed.
+   */
+  @Retries.RetryTranslated
+  public int abortMultipartUploadsUnderPath(String prefix)
+      throws IOException {
+    LOG.debug("Aborting multipart uploads under {}", prefix);
+    int count = 0;
+    List<MultipartUpload> multipartUploads = 
owner.listMultipartUploads(prefix);
+    LOG.debug("Number of outstanding uploads: {}", multipartUploads.size());
+    for (MultipartUpload upload: multipartUploads) {
+      try {
+        abortMultipartUpload(upload);
+        count++;
+      } catch (FileNotFoundException e) {
+        LOG.debug("Already aborted: {}", upload.getKey(), e);
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Abort a multipart commit operation.
+   * @param destKey destination key of ongoing operation
+   * @param uploadId multipart operation Id
+   * @throws IOException on problems.
+   * @throws FileNotFoundException if the abort ID is unknown
+   */
+  @Retries.RetryTranslated
+  public void abortMultipartCommit(String destKey, String uploadId)
+      throws IOException {
+    abortMultipartUpload(destKey, uploadId, invoker.getRetryCallback());
+  }
+
+  /**
+   * Create and initialize a part request of a multipart upload.
+   * Exactly one of: {@code uploadStream} or {@code sourceFile}
+   * must be specified.
+   * A subset of the file may be posted, by providing the starting point
+   * in {@code offset} and a length of block in {@code size} equal to
+   * or less than the remaining bytes.
+   * @param destKey destination key of ongoing operation
+   * @param uploadId ID of ongoing upload
+   * @param partNumber current part number of the upload
+   * @param size amount of data
+   * @param uploadStream source of data to upload
+   * @param sourceFile optional source file.
+   * @param offset offset in file to start reading.
+   * @return the request.
+   */
+  public UploadPartRequest newUploadPartRequest(
+      String destKey,
+      String uploadId,
+      int partNumber,
+      int size,
+      InputStream uploadStream,
+      File sourceFile,
+      Long offset) {
+    checkNotNull(uploadId);
+    // exactly one source must be set; xor verifies this
+    checkArgument((uploadStream != null) ^ (sourceFile != null),
+        "Data source");
+    checkArgument(size >= 0, "Invalid partition size %s", size);
+    checkArgument(partNumber > 0 && partNumber <= 10000,
+        "partNumber must be between 1 and 10000 inclusive, but is %s",
+        partNumber);
+
+    LOG.debug("Creating part upload request for {} #{} size {}",
+        uploadId, partNumber, size);
+    UploadPartRequest request = new UploadPartRequest()
+        .withBucketName(owner.getBucket())
+        .withKey(destKey)
+        .withUploadId(uploadId)
+        .withPartNumber(partNumber)
+        .withPartSize(size);
+    if (uploadStream != null) {
+      // there's an upload stream. Bind to it.
+      request.setInputStream(uploadStream);
+    } else {
+      checkArgument(sourceFile.exists(),
+          "Source file does not exist: %s", sourceFile);
+      checkArgument(offset >= 0, "Invalid offset %s", offset);
+      long length = sourceFile.length();
+      checkArgument(offset == 0 || offset < length,
+          "Offset %s beyond length of file %s", offset, length);
+      request.setFile(sourceFile);
+      request.setFileOffset(offset);
+    }
+    return request;
+  }
+
+  /**
+   * The toString method is intended to be used in logging/toString calls.
+   * @return a string description.
+   */
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "WriteOperationHelper {bucket=").append(owner.getBucket());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * PUT an object directly (i.e. not via the transfer manager).
+   * Byte length is calculated from the file length, or, if there is no
+   * file, from the content length of the header.
+   * @param putObjectRequest the request
+   * @return the upload initiated
+   * @throws IOException on problems
+   */
+  @Retries.RetryTranslated
+  public PutObjectResult putObject(PutObjectRequest putObjectRequest)
+      throws IOException {
+    return retry("put",
+        putObjectRequest.getKey(), true,
+        () -> owner.putObjectDirect(putObjectRequest));
+  }
+
+  /**
+   * PUT an object via the transfer manager.
+   * @param putObjectRequest the request
+   * @return the result of the operation
+   * @throws IOException on problems
+   */
+  @Retries.OnceTranslated
+  public UploadResult uploadObject(PutObjectRequest putObjectRequest)
+      throws IOException {
+    // no retry; rely on xfer manager logic
+    return retry("put",
+        putObjectRequest.getKey(), true,
+        () -> owner.executePut(putObjectRequest, null));
+  }
+
+  /**
+   * Revert a commit by deleting the file.
+   * Relies on retry code in filesystem
+   * @throws IOException on problems
+   * @param destKey destination key
+   */
+  @Retries.RetryTranslated
+  public void revertCommit(String destKey) throws IOException {
+    once("revert commit", destKey,
+        () -> {
+          Path destPath = owner.keyToQualifiedPath(destKey);
+          owner.deleteObjectAtPath(destPath,
+              destKey, true);
+          owner.maybeCreateFakeParentDirectory(destPath);
+        }
+    );
+  }
+
+  /**
+   * Upload part of a multi-partition file.
+   * @param request request
+   * @return the result of the operation.
+   * @throws IOException on problems
+   */
+  @Retries.RetryTranslated
+  public UploadPartResult uploadPart(UploadPartRequest request)
+      throws IOException {
+    return retry("upload part",
+        request.getKey(),
+        true,
+        () -> owner.uploadPart(request));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
new file mode 100644
index 0000000..3277916
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.apache.hadoop.fs.s3a.Invoker.ignoreIOExceptions;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
+
+/**
+ * Abstract base class for S3A committers; allows for any commonality
+ * between different architectures.
+ *
+ * Although the committer APIs allow for a committer to be created without
+ * an output path, this is not supported in this class or its subclasses:
+ * a destination must be supplied. It is left to the committer factory
+ * to handle the creation of a committer when the destination is unknown.
+ *
+ * Requiring an output directory simplifies coding and testing.
+ */
+public abstract class AbstractS3ACommitter extends PathOutputCommitter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractS3ACommitter.class);
+
+  /**
+   * Thread pool for task execution.
+   */
+  private ExecutorService threadPool;
+
+  /** Underlying commit operations. */
+  private final CommitOperations commitOperations;
+
+  /**
+   * Final destination of work.
+   */
+  private Path outputPath;
+
+  /**
+   * Role: used in log/text messages.
+   */
+  private final String role;
+
+  /**
+   * This is the directory for all intermediate work: where the output format
+   * will write data.
+   * <i>This may not be on the final file system</i>
+   */
+  private Path workPath;
+
+  /** Configuration of the job. */
+  private Configuration conf;
+
+  /** Filesystem of {@link #outputPath}. */
+  private FileSystem destFS;
+
+  /** The job context. For a task, this can be cast to a TaskContext. */
+  private final JobContext jobContext;
+
+  /** Should a job marker be created? */
+  private final boolean createJobMarker;
+
+  /**
+   * Create a committer.
+   * This constructor binds the destination directory and configuration, but
+   * does not update the work path: That must be calculated by the
+   * implementation;
+   * It is omitted here to avoid subclass methods being called too early.
+   * @param outputPath the job's output path: MUST NOT be null.
+   * @param context the task's context
+   * @throws IOException on a failure
+   */
+  protected AbstractS3ACommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    Preconditions.checkArgument(outputPath != null, "null output path");
+    Preconditions.checkArgument(context != null, "null job context");
+    this.jobContext = context;
+    this.role = "Task committer " + context.getTaskAttemptID();
+    setConf(context.getConfiguration());
+    initOutput(outputPath);
+    LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
+        role, jobName(context), jobIdString(context), outputPath);
+    S3AFileSystem fs = getDestS3AFS();
+    createJobMarker = context.getConfiguration().getBoolean(
+        CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+        DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
+    commitOperations = new CommitOperations(fs);
+  }
+
+  /**
+   * Init the output filesystem and path.
+   * TESTING ONLY; allows mock FS to cheat.
+   * @param out output path
+   * @throws IOException failure to create the FS.
+   */
+  @VisibleForTesting
+  protected void initOutput(Path out) throws IOException {
+    FileSystem fs = getDestinationFS(out, getConf());
+    setDestFS(fs);
+    setOutputPath(fs.makeQualified(out));
+  }
+
+  /**
+   * Get the job/task context this committer was instantiated with.
+   * @return the context.
+   */
+  public final JobContext getJobContext() {
+    return jobContext;
+  }
+
+  /**
+   * Final path of output, in the destination FS.
+   * @return the path
+   */
+  @Override
+  public final Path getOutputPath() {
+    return outputPath;
+  }
+
+  /**
+   * Set the output path.
+   * @param outputPath new value
+   */
+  protected final void setOutputPath(Path outputPath) {
+    Preconditions.checkNotNull(outputPath, "Null output path");
+    this.outputPath = outputPath;
+  }
+
+  /**
+   * This is the critical method for {@code FileOutputFormat}; it declares
+   * the path for work.
+   * @return the working path.
+   */
+  @Override
+  public Path getWorkPath() {
+    return workPath;
+  }
+
+  /**
+   * Set the work path for this committer.
+   * @param workPath the work path to use.
+   */
+  protected void setWorkPath(Path workPath) {
+    LOG.debug("Setting work path to {}", workPath);
+    this.workPath = workPath;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  protected void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the destination FS, creating it on demand if needed.
+   * @return the filesystem; requires the output path to be set up
+   * @throws IOException if the FS cannot be instantiated.
+   */
+  public FileSystem getDestFS() throws IOException {
+    if (destFS == null) {
+      FileSystem fs = getDestinationFS(outputPath, getConf());
+      setDestFS(fs);
+    }
+    return destFS;
+  }
+
+  /**
+   * Get the destination as an S3A Filesystem; casting it.
+   * @return the dest S3A FS.
+   * @throws IOException if the FS cannot be instantiated.
+   */
+  public S3AFileSystem getDestS3AFS() throws IOException {
+    return (S3AFileSystem) getDestFS();
+  }
+
+  /**
+   * Set the destination FS: the FS of the final output.
+   * @param destFS destination FS.
+   */
+  protected void setDestFS(FileSystem destFS) {
+    this.destFS = destFS;
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param context the context of the job.  This is used to get the
+   * application attempt ID.
+   * @return the path to store job attempt data.
+   */
+  public Path getJobAttemptPath(JobContext context) {
+    return getJobAttemptPath(getAppAttemptId(context));
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  protected abstract Path getJobAttemptPath(int appAttemptId);
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed. This may be the normal Task attempt path
+   * or it may be a subdirectory.
+   * The default implementation returns the value of
+   * {@link #getBaseTaskAttemptPath(TaskAttemptContext)};
+   * subclasses may return different values.
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  public Path getTaskAttemptPath(TaskAttemptContext context) {
+    return getBaseTaskAttemptPath(context);
+  }
+
+  /**
+   * Compute the base path where the output of a task attempt is written.
+   * This is the path which will be deleted when a task is cleaned up and
+   * aborted.
+   *
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  protected abstract Path getBaseTaskAttemptPath(TaskAttemptContext context);
+
+  /**
+   * Get a temporary directory for data. When a task is aborted/cleaned
+   * up, the contents of this directory are all deleted.
+   * @param context task context
+   * @return a path for temporary data.
+   */
+  public abstract Path getTempTaskAttemptPath(TaskAttemptContext context);
+
+  /**
+   * Get the name of this committer.
+   * @return the committer name.
+   */
+  public abstract String getName();
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "AbstractS3ACommitter{");
+    sb.append("role=").append(role);
+    sb.append(", name").append(getName());
+    sb.append(", outputPath=").append(getOutputPath());
+    sb.append(", workPath=").append(workPath);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Get the destination filesystem from the output path and the configuration.
+   * @param out output path
+   * @param config job/task config
+   * @return the associated FS
+   * @throws PathCommitException output path isn't to an S3A FS instance.
+   * @throws IOException failure to instantiate the FS.
+   */
+  protected FileSystem getDestinationFS(Path out, Configuration config)
+      throws IOException {
+    return getS3AFileSystem(out, config,
+        requiresDelayedCommitOutputInFileSystem());
+  }
+
+  /**
+   * Flag to indicate whether or not the destination filesystem needs
+   * to be configured to support magic paths where the output isn't immediately
+   * visible. If the committer returns true, then committer setup will
+   * fail if the FS doesn't have the capability.
+   * Base implementation returns false.
+   * @return what the requirements of the committer are of the filesystem.
+   */
+  protected boolean requiresDelayedCommitOutputInFileSystem() {
+    return false;
+  }
+  /**
+   * Task recovery considered unsupported: Warn and fail.
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException always.
+   */
+  @Override
+  public void recoverTask(TaskAttemptContext taskContext) throws IOException {
+    LOG.warn("Cannot recover task {}", taskContext.getTaskAttemptID());
+    throw new PathCommitException(outputPath,
+        String.format("Unable to recover task %s",
+        taskContext.getTaskAttemptID()));
+  }
+
+  /**
+   * if the job requires a success marker on a successful job,
+   * create the file {@link CommitConstants#_SUCCESS}.
+   *
+   * While the classic committers create a 0-byte file, the S3Guard committers
+   * PUT up a the contents of a {@link SuccessData} file.
+   * @param context job context
+   * @param pending the pending commits
+   * @throws IOException IO failure
+   */
+  protected void maybeCreateSuccessMarkerFromCommits(JobContext context,
+      List<SinglePendingCommit> pending) throws IOException {
+    List<String> filenames = new ArrayList<>(pending.size());
+    for (SinglePendingCommit commit : pending) {
+      String key = commit.getDestinationKey();
+      if (!key.startsWith("/")) {
+        // fix up so that FS.makeQualified() sets up the path OK
+        key = "/" + key;
+      }
+      filenames.add(key);
+    }
+    maybeCreateSuccessMarker(context, filenames);
+  }
+
+  /**
+   * if the job requires a success marker on a successful job,
+   * create the file {@link CommitConstants#_SUCCESS}.
+   *
+   * While the classic committers create a 0-byte file, the S3Guard committers
+   * PUT up a the contents of a {@link SuccessData} file.
+   * @param context job context
+   * @param filenames list of filenames.
+   * @throws IOException IO failure
+   */
+  protected void maybeCreateSuccessMarker(JobContext context,
+      List<String> filenames)
+      throws IOException {
+    if (createJobMarker) {
+      // create a success data structure and then save it
+      SuccessData successData = new SuccessData();
+      successData.setCommitter(getName());
+      successData.setDescription(getRole());
+      successData.setHostname(NetUtils.getLocalHostname());
+      Date now = new Date();
+      successData.setTimestamp(now.getTime());
+      successData.setDate(now.toString());
+      successData.setFilenames(filenames);
+      commitOperations.createSuccessMarker(getOutputPath(), successData, true);
+    }
+  }
+
+  /**
+   * Base job setup deletes the success marker.
+   * TODO: Do we need this?
+   * @param context context
+   * @throws IOException IO failure
+   */
+/*
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    if (createJobMarker) {
+      try (DurationInfo d = new DurationInfo("Deleting _SUCCESS marker")) {
+        commitOperations.deleteSuccessMarker(getOutputPath());
+      }
+    }
+  }
+*/
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s",
+        context.getTaskAttemptID())) {
+      Path taskAttemptPath = getTaskAttemptPath(context);
+      FileSystem fs = getTaskAttemptFilesystem(context);
+      fs.mkdirs(taskAttemptPath);
+    }
+  }
+
+  /**
+   * Get the task attempt path filesystem. This may not be the same as the
+   * final destination FS, and so may not be an S3A FS.
+   * @param context task attempt
+   * @return the filesystem
+   * @throws IOException failure to instantiate
+   */
+  protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext context)
+      throws IOException {
+    return getTaskAttemptPath(context).getFileSystem(getConf());
+  }
+
+  /**
+   * Commit a list of pending uploads.
+   * @param context job context
+   * @param pending list of pending uploads
+   * @throws IOException on any failure
+   */
+  protected void commitPendingUploads(JobContext context,
+      List<SinglePendingCommit> pending) throws IOException {
+    if (pending.isEmpty()) {
+      LOG.warn("{}: No pending uploads to commit", getRole());
+    }
+    LOG.debug("{}: committing the output of {} task(s)",
+        getRole(), pending.size());
+    Tasks.foreach(pending)
+        .stopOnFailure()
+        .executeWith(buildThreadPool(context))
+        .onFailure((commit, exception) ->
+                getCommitOperations().abortSingleCommit(commit))
+        .abortWith(commit -> getCommitOperations().abortSingleCommit(commit))
+        .revertWith(commit -> getCommitOperations().revertCommit(commit))
+        .run(commit -> getCommitOperations().commitOrFail(commit));
+  }
+
+  /**
+   * Try to read every pendingset file and build a list of them/
+   * In the case of a failure to read the file, exceptions are held until all
+   * reads have been attempted.
+   * @param context job context
+   * @param suppressExceptions whether to suppress exceptions.
+   * @param fs job attempt fs
+   * @param pendingCommitFiles list of files found in the listing scan
+   * @return the list of commits
+   * @throws IOException on a failure when suppressExceptions is false.
+   */
+  protected List<SinglePendingCommit> loadPendingsetFiles(
+      JobContext context,
+      boolean suppressExceptions,
+      FileSystem fs,
+      Iterable<? extends FileStatus> pendingCommitFiles) throws IOException {
+
+    final List<SinglePendingCommit> pending = Collections.synchronizedList(
+        Lists.newArrayList());
+    Tasks.foreach(pendingCommitFiles)
+        .suppressExceptions(suppressExceptions)
+        .executeWith(buildThreadPool(context))
+        .run(pendingCommitFile ->
+          pending.addAll(
+              PendingSet.load(fs, pendingCommitFile.getPath()).getCommits())
+      );
+    return pending;
+  }
+
+  /**
+   * Internal Job commit operation: where the S3 requests are made
+   * (potentially in parallel).
+   * @param context job context
+   * @param pending pending request
+   * @throws IOException any failure
+   */
+  protected void commitJobInternal(JobContext context,
+      List<SinglePendingCommit> pending)
+      throws IOException {
+
+    commitPendingUploads(context, pending);
+  }
+
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state)
+      throws IOException {
+    LOG.info("{}: aborting job {} in state {}",
+        getRole(), jobIdString(context), state);
+    // final cleanup operations
+    abortJobInternal(context, false);
+  }
+
+
+  /**
+   * The internal job abort operation; can be overridden in tests.
+   * This must clean up operations; it is called when a commit fails, as
+   * well as in an {@link #abortJob(JobContext, JobStatus.State)} call.
+   * The base implementation calls {@link #cleanup(JobContext, boolean)}
+   * @param context job context
+   * @param suppressExceptions should exceptions be suppressed?
+   * @throws IOException any IO problem raised when suppressExceptions is 
false.
+   */
+  protected void abortJobInternal(JobContext context,
+      boolean suppressExceptions)
+      throws IOException {
+    cleanup(context, suppressExceptions);
+  }
+
+  /**
+   * Abort all pending uploads to the destination directory during
+   * job cleanup operations.
+   * @param suppressExceptions should exceptions be suppressed
+   */
+  protected void abortPendingUploadsInCleanup(
+      boolean suppressExceptions) throws IOException {
+    Path dest = getOutputPath();
+    try (DurationInfo d =
+             new DurationInfo(LOG, "Aborting all pending commits under %s",
+                 dest)) {
+      CommitOperations ops = getCommitOperations();
+      List<MultipartUpload> pending = ops
+          .listPendingUploadsUnderPath(dest);
+      Tasks.foreach(pending)
+          .executeWith(buildThreadPool(getJobContext()))
+          .suppressExceptions(suppressExceptions)
+          .run(u -> ops.abortMultipartCommit(u.getKey(), u.getUploadId()));
+    }
+  }
+
+  /**
+   * Subclass-specific pre commit actions.
+   * @param context job context
+   * @param pending the pending operations
+   * @throws IOException any failure
+   */
+  protected void preCommitJob(JobContext context,
+      List<SinglePendingCommit> pending) throws IOException {
+  }
+
+  /**
+   * Commit work.
+   * This consists of two stages: precommit and commit.
+   * <p>
+   * Precommit: identify pending uploads, then allow subclasses
+   * to validate the state of the destination and the pending uploads.
+   * Any failure here triggers an abort of all pending uploads.
+   * <p>
+   * Commit internal: do the final commit sequence.
+   * <p>
+   * The final commit action is to build the {@code __SUCCESS} file entry.
+   * </p>
+   * @param context job context
+   * @throws IOException any failure
+   */
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    String id = jobIdString(context);
+    try (DurationInfo d = new DurationInfo(LOG,
+        "%s: commitJob(%s)", getRole(), id)) {
+      List<SinglePendingCommit> pending
+          = listPendingUploadsToCommit(context);
+      preCommitJob(context, pending);
+      commitJobInternal(context, pending);
+      jobCompleted(true);
+      maybeCreateSuccessMarkerFromCommits(context, pending);
+      cleanup(context, false);
+    } catch (IOException e) {
+      LOG.warn("Commit failure for job {}", id, e);
+      jobCompleted(false);
+      abortJobInternal(context, true);
+      throw e;
+    }
+  }
+
+  /**
+   * Job completion outcome; this may be subclassed in tests.
+   * @param success did the job succeed.
+   */
+  protected void jobCompleted(boolean success) {
+    getCommitOperations().jobCompleted(success);
+  }
+
+  /**
+   * Clean up any staging directories.
+   * IOEs must be caught and swallowed.
+   */
+  public abstract void cleanupStagingDirs();
+
+  /**
+   * Get the list of pending uploads for this job attempt.
+   * @param context job context
+   * @return a list of pending uploads.
+   * @throws IOException Any IO failure
+   */
+  protected abstract List<SinglePendingCommit> listPendingUploadsToCommit(
+      JobContext context)
+      throws IOException;
+
+  /**
+   * Cleanup the job context, including aborting anything pending.
+   * @param context job context
+   * @param suppressExceptions should exceptions be suppressed?
+   * @throws IOException any failure if exceptions were not suppressed.
+   */
+  protected void cleanup(JobContext context,
+      boolean suppressExceptions) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Cleanup job %s", jobIdString(context))) {
+      abortPendingUploadsInCleanup(suppressExceptions);
+    } finally {
+      cleanupStagingDirs();
+    }
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public void cleanupJob(JobContext context) throws IOException {
+    String r = getRole();
+    String id = jobIdString(context);
+    LOG.warn("{}: using deprecated cleanupJob call for {}", r, id);
+    try (DurationInfo d = new DurationInfo(LOG, "%s: cleanup Job %s", r, id)) {
+      cleanup(context, true);
+    }
+  }
+
+  /**
+   * Execute an operation; maybe suppress any raised IOException.
+   * @param suppress should raised IOEs be suppressed?
+   * @param action action (for logging when the IOE is supressed.
+   * @param operation operation
+   * @throws IOException if operation raised an IOE and suppress == false
+   */
+  protected void maybeIgnore(
+      boolean suppress,
+      String action,
+      Invoker.VoidOperation operation) throws IOException {
+    if (suppress) {
+      ignoreIOExceptions(LOG, action, "", operation);
+    } else {
+      operation.execute();
+    }
+  }
+
+  /**
+   * Execute an operation; maybe suppress any raised IOException.
+   * @param suppress should raised IOEs be suppressed?
+   * @param action action (for logging when the IOE is suppressed.
+   * @param ex  exception
+   * @throws IOException if suppress == false
+   */
+  protected void maybeIgnore(
+      boolean suppress,
+      String action,
+      IOException ex) throws IOException {
+    if (suppress) {
+      LOG.info(action, ex);
+    } else {
+      throw ex;
+    }
+  }
+
+  /**
+   * Get the commit actions instance.
+   * Subclasses may provide a mock version of this.
+   * @return the commit actions instance to use for operations.
+   */
+  protected CommitOperations getCommitOperations() {
+    return commitOperations;
+  }
+
+  /**
+   * Used in logging and reporting to help disentangle messages.
+   * @return the committer's role.
+   */
+  protected String getRole() {
+    return role;
+  }
+
+  /**
+   * Returns an {@link ExecutorService} for parallel tasks. The number of
+   * threads in the thread-pool is set by s3.multipart.committer.num-threads.
+   * If num-threads is 0, this will return null;
+   *
+   * @param context the JobContext for this commit
+   * @return an {@link ExecutorService} or null for the number of threads
+   */
+  protected final synchronized ExecutorService buildThreadPool(
+      JobContext context) {
+
+    if (threadPool == null) {
+      int numThreads = context.getConfiguration().getInt(
+          FS_S3A_COMMITTER_THREADS,
+          DEFAULT_COMMITTER_THREADS);
+      LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
+      if (numThreads > 0) {
+        threadPool = Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("s3-committer-pool-%d")
+                .build());
+      } else {
+        return null;
+      }
+    }
+    return threadPool;
+  }
+
+  /**
+   * Delete the task attempt path without raising any errors.
+   * @param context task context
+   */
+  protected void deleteTaskAttemptPathQuietly(TaskAttemptContext context) {
+    Path attemptPath = getBaseTaskAttemptPath(context);
+    ignoreIOExceptions(LOG, "Delete task attempt path", attemptPath.toString(),
+        () -> deleteQuietly(
+            getTaskAttemptFilesystem(context), attemptPath, true));
+  }
+
+  /**
+   * Abort all pending uploads in the list.
+   * @param context job context
+   * @param pending pending uploads
+   * @param suppressExceptions should exceptions be suppressed
+   * @throws IOException any exception raised
+   */
+  protected void abortPendingUploads(JobContext context,
+      List<SinglePendingCommit> pending,
+      boolean suppressExceptions)
+      throws IOException {
+    if (pending == null || pending.isEmpty()) {
+      LOG.info("{}: no pending commits to abort", getRole());
+    } else {
+      try (DurationInfo d = new DurationInfo(LOG,
+          "Aborting %s uploads", pending.size())) {
+        Tasks.foreach(pending)
+            .executeWith(buildThreadPool(context))
+            .suppressExceptions(suppressExceptions)
+            .run(commit -> getCommitOperations().abortSingleCommit(commit));
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to