http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
new file mode 100644
index 0000000..55e3e37
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Relays FS calls to the mocked FS, allows for some extra logging with
+ * stack traces to be included, stubbing out other methods
+ * where needed to avoid failures.
+ *
+ * The logging is useful for tracking
+ * down why there are extra calls to a method than a test would expect:
+ * changes in implementation details often trigger such false-positive
+ * test failures.
+ *
+ * This class is in the s3a package so that it has access to methods
+ */
+public class MockS3AFileSystem extends S3AFileSystem {
+  public static final String BUCKET = "bucket-name";
+  public static final URI FS_URI = URI.create("s3a://" + BUCKET + "/");
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(MockS3AFileSystem.class);
+
+  private final S3AFileSystem mock;
+  private final Pair<StagingTestBase.ClientResults,
+      StagingTestBase.ClientErrors> outcome;
+
+  /** Log nothing: {@value}. */
+  public static final int LOG_NONE = 0;
+
+  /** Log the name of the operation any arguments: {@value}.  */
+  public static final int LOG_NAME = 1;
+
+  /** Log the entire stack of where operations are called: {@value}.  */
+  public static final int LOG_STACK = 2;
+
+  /**
+   * This can be edited to set the log level of events through the
+   * mock FS.
+   */
+  private int logEvents = LOG_NAME;
+  private final S3AInstrumentation instrumentation =
+      new S3AInstrumentation(FS_URI);
+  private Configuration conf;
+
+  public MockS3AFileSystem(S3AFileSystem mock,
+      Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> 
outcome) {
+    this.mock = mock;
+    this.outcome = outcome;
+    setUri(FS_URI);
+    setBucket(BUCKET);
+  }
+
+  public Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors>
+      getOutcome() {
+    return outcome;
+  }
+
+  public int getLogEvents() {
+    return logEvents;
+  }
+
+  public void setLogEvents(int logEvents) {
+    this.logEvents = logEvents;
+  }
+
+  private void event(String format, Object... args) {
+    Throwable ex = null;
+    String s = String.format(format, args);
+    switch (logEvents) {
+    case LOG_STACK:
+      ex = new Exception(s);
+        /* fall through */
+    case LOG_NAME:
+      LOG.info(s, ex);
+      break;
+    case LOG_NONE:
+    default:
+      //nothing
+    }
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return new Path("s3a://" + BUCKET + "/work");
+  }
+
+  @Override
+  public void initialize(URI name, Configuration originalConf)
+      throws IOException {
+    conf = originalConf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public boolean isMagicCommitEnabled() {
+    return true;
+  }
+
+  /**
+   * Make operation to set the s3 client public.
+   * @param client client.
+   */
+  @Override
+  public void setAmazonS3Client(AmazonS3 client) {
+    LOG.debug("Setting S3 client to {}", client);
+    super.setAmazonS3Client(client);
+  }
+
+  @Override
+  public boolean exists(Path f) throws IOException {
+    event("exists(%s)", f);
+    return mock.exists(f);
+  }
+
+  @Override
+  void finishedWrite(String key, long length) {
+
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    event("open(%s)", f);
+    return mock.open(f, bufferSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f,
+      FsPermission permission,
+      boolean overwrite,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    event("create(%s)", f);
+    return mock.create(f, permission, overwrite, bufferSize, replication,
+        blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f,
+      int bufferSize,
+      Progressable progress) throws IOException {
+    return mock.append(f, bufferSize, progress);
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    event("rename(%s, %s)", src, dst);
+    return mock.rename(src, dst);
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    event("delete(%s, %s)", f, recursive);
+    return mock.delete(f, recursive);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f)
+      throws IOException {
+    event("listStatus(%s)", f);
+    return mock.listStatus(f);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive)
+      throws IOException {
+    event("listFiles(%s, %s)", f, recursive);
+    return new EmptyIterator();
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    mock.setWorkingDirectory(newDir);
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    event("mkdirs(%s)", f);
+    return mock.mkdirs(f, permission);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    event("getFileStatus(%s)", f);
+    return mock.getFileStatus(f);
+  }
+
+  @Override
+  public long getDefaultBlockSize(Path f) {
+    return mock.getDefaultBlockSize(f);
+  }
+
+  @Override
+  protected void incrementStatistic(Statistic statistic) {
+  }
+
+  @Override
+  protected void incrementStatistic(Statistic statistic, long count) {
+  }
+
+  @Override
+  protected void incrementGauge(Statistic statistic, long count) {
+  }
+
+  @Override
+  public void incrementReadOperations() {
+  }
+
+  @Override
+  public void incrementWriteOperations() {
+  }
+
+  @Override
+  public void incrementPutStartStatistics(long bytes) {
+  }
+
+  @Override
+  public void incrementPutCompletedStatistics(boolean success, long bytes) {
+  }
+
+  @Override
+  public void incrementPutProgressStatistics(String key, long bytes) {
+  }
+
+  @Override
+  protected void setOptionalMultipartUploadRequestParameters(
+      InitiateMultipartUploadRequest req) {
+// no-op
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public long getDefaultBlockSize() {
+    return mock.getDefaultBlockSize();
+  }
+
+  @Override
+  void deleteObjectAtPath(Path f, String key, boolean isFile)
+      throws AmazonClientException, IOException {
+    deleteObject(key);
+  }
+
+  @Override
+  void maybeCreateFakeParentDirectory(Path path)
+      throws IOException, AmazonClientException {
+    // no-op
+  }
+
+  private static class EmptyIterator implements
+      RemoteIterator<LocatedFileStatus> {
+    @Override
+    public boolean hasNext() throws IOException {
+      return false;
+    }
+
+    @Override
+    public LocatedFileStatus next() throws IOException {
+      return null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "MockS3AFileSystem{");
+    sb.append("inner mockFS=").append(mock);
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  public S3AInstrumentation.CommitterStatistics newCommitterStatistics() {
+    return instrumentation.newCommitterStatistics();
+  }
+
+  @Override
+  public void operationRetried(Exception ex) {
+    /** no-op */
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
index 4e25380..b746bfe5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.fs.s3a;
 import static org.mockito.Mockito.*;
 
 import java.net.URI;
+import java.util.ArrayList;
 
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.Region;
 
 /**
@@ -36,6 +38,12 @@ public class MockS3ClientFactory implements S3ClientFactory {
     String bucket = name.getHost();
     AmazonS3 s3 = mock(AmazonS3.class);
     when(s3.doesBucketExist(bucket)).thenReturn(true);
+    // this listing is used in startup if purging is enabled, so
+    // return a stub value
+    MultipartUploadListing noUploads = new MultipartUploadListing();
+    noUploads.setMultipartUploads(new ArrayList<>(0));
+    when(s3.listMultipartUploads(anyObject()))
+        .thenReturn(noUploads);
     when(s3.getBucketLocation(anyString()))
         .thenReturn(Region.US_West.toString());
     return s3;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 2c4f009..7f7802d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -163,4 +163,10 @@ public interface S3ATestConstants {
    */
   String CONFIGURATION_TEST_ENDPOINT =
       "test.fs.s3a.endpoint";
+
+  /**
+   * Property to set to disable caching.
+   */
+  String FS_S3A_IMPL_DISABLE_CACHE
+      = "fs.s3a.impl.disable.cache";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index b302e72..773c25a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.fs.s3a;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory;
 import org.apache.hadoop.fs.s3a.s3guard.DynamoDBLocalClientFactory;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -39,9 +42,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
@@ -59,6 +66,7 @@ public final class S3ATestUtils {
    * a property has been unset.
    */
   public static final String UNSET_PROPERTY = "unset";
+  public static final int PURGE_DELAY_SECONDS = 60 * 60;
 
   /**
    * Get S3A FS name.
@@ -120,15 +128,19 @@ public final class S3ATestUtils {
     S3AFileSystem fs1 = new S3AFileSystem();
     //enable purging in tests
     if (purge) {
-      conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
-      // but a long delay so that parallel multipart tests don't
+      // purge with but a delay so that parallel multipart tests don't
       // suddenly start timing out
-      conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 30 * 60);
+      enableMultipartPurge(conf, PURGE_DELAY_SECONDS);
     }
     fs1.initialize(testURI, conf);
     return fs1;
   }
 
+  public static void enableMultipartPurge(Configuration conf, int seconds) {
+    conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
+    conf.setInt(PURGE_EXISTING_MULTIPART_AGE, seconds);
+  }
+
   /**
    * Create a file context for tests.
    *
@@ -287,13 +299,13 @@ public final class S3ATestUtils {
    * @return the exception, if it is of the expected class
    * @throws Exception the exception passed in.
    */
-  public static Exception verifyExceptionClass(Class clazz,
+  public static <E extends Throwable> E verifyExceptionClass(Class<E> clazz,
       Exception ex)
       throws Exception {
     if (!(ex.getClass().equals(clazz))) {
       throw ex;
     }
-    return ex;
+    return (E)ex;
   }
 
   /**
@@ -302,7 +314,7 @@ public final class S3ATestUtils {
    * @param conf configuration to patch
    */
   public static void disableFilesystemCaching(Configuration conf) {
-    conf.setBoolean("fs.s3a.impl.disable.cache", true);
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, true);
   }
 
   /**
@@ -732,4 +744,112 @@ public final class S3ATestUtils {
         = (S3ABlockOutputStream) out.getWrappedStream();
     return blockOutputStream.getStatistics();
   }
+
+  /**
+   * Read in a file and convert to an ascii string.
+   * @param fs filesystem
+   * @param path path to read
+   * @return the bytes read and converted to a string
+   * @throws IOException IO problems
+   */
+  public static String read(FileSystem fs,
+      Path path) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    try (FSDataInputStream in = fs.open(path)) {
+      byte[] buf = new byte[(int)status.getLen()];
+      in.readFully(0, buf);
+      return new String(buf);
+    }
+  }
+
+  /**
+   * List a directory.
+   * @param fileSystem FS
+   * @param path path
+   * @throws IOException failure.
+   */
+  public static void lsR(FileSystem fileSystem, Path path, boolean recursive)
+      throws Exception {
+    if (path == null) {
+      // surfaces when someone calls getParent() on something at the top
+      // of the path
+      LOG.info("Empty path");
+      return;
+    }
+    S3AUtils.applyLocatedFiles(fileSystem.listFiles(path, recursive),
+        (status) -> LOG.info("  {}", status));
+  }
+
+  /**
+   * Turn on the inconsistent S3A FS client in a configuration,
+   * with 100% probability of inconsistency, default delays.
+   * For this to go live, the paths must include the element
+   * {@link InconsistentAmazonS3Client#DEFAULT_DELAY_KEY_SUBSTRING}.
+   * @param conf configuration to patch
+   * @param delay delay in millis
+   */
+  public static void enableInconsistentS3Client(Configuration conf,
+      long delay) {
+    LOG.info("Enabling inconsistent S3 client");
+    conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+        S3ClientFactory.class);
+    conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
+    conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, delay);
+    conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 0.0f);
+    conf.setFloat(FAIL_INJECT_THROTTLE_PROBABILITY, 0.0f);
+  }
+
+  /**
+   * Is the filesystem using the inconsistent/throttling/unreliable client?
+   * @param fs filesystem
+   * @return true if the filesystem's client is the inconsistent one.
+   */
+  public static boolean isFaultInjecting(S3AFileSystem fs) {
+    return fs.getAmazonS3Client() instanceof InconsistentAmazonS3Client;
+  }
+
+  /**
+   * Skip a test because the client is using fault injection.
+   * This should only be done for those tests which are measuring the cost
+   * of operations or otherwise cannot handle retries.
+   * @param fs filesystem to check
+   */
+  public static void skipDuringFaultInjection(S3AFileSystem fs) {
+    Assume.assumeFalse("Skipping as filesystem has fault injection",
+        isFaultInjecting(fs));
+  }
+
+  /**
+   * Date format used for mapping upload initiation time to human string.
+   */
+  private static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
+      "yyyy-MM-dd HH:mm:ss");
+
+  /**
+   * Get a list of all pending uploads under a prefix, one which can be 
printed.
+   * @param prefix prefix to look under
+   * @return possibly empty list
+   * @throws IOException IO failure.
+   */
+  public static List<String> listMultipartUploads(S3AFileSystem fs,
+      String prefix) throws IOException {
+
+    return fs
+        .listMultipartUploads(prefix).stream()
+        .map(upload -> String.format("Upload to %s with ID %s; initiated %s",
+            upload.getKey(),
+            upload.getUploadId(),
+            LISTING_FORMAT.format(upload.getInitiated())))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Skip a test if the FS isn't marked as supporting magic commits.
+   * @param fs filesystem
+   */
+  public void assumeMagicCommitEnabled(S3AFileSystem fs) {
+    assume("Magic commit option disabled on " + fs,
+        fs.hasCapability(CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java
new file mode 100644
index 0000000..7d17d69
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/StorageStatisticsTracker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Joiner;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.StorageStatistics;
+
+/**
+ * Class to track storage statistics of a filesystem, generate diffs.
+ */
+public class StorageStatisticsTracker {
+
+  private final FileSystem fs;
+  private Map<String, Long> stats;
+
+  public StorageStatisticsTracker(FileSystem fs) {
+    this.fs = fs;
+    snapshot();
+  }
+
+  public void mark() {
+    stats = snapshot();
+  }
+
+  public Map<String, Long> compare(Map<String, Long> current) {
+    Map<String, Long> diff = new HashMap<>(stats.size());
+    for (Map.Entry<String, Long> entry : stats.entrySet()) {
+      String key = entry.getKey();
+      Long latest = current.get(key);
+      if (latest != null && !latest.equals(entry.getValue())) {
+        diff.put(key, entry.getValue() - latest);
+      }
+    }
+    return diff;
+  }
+
+  public Map<String, Long> compareToCurrent() {
+    return compare(snapshot());
+  }
+
+  public String toString(Map<String, Long> map) {
+    return Joiner.on("\n").withKeyValueSeparator("=").join(map);
+  }
+
+  public Map<String, Long> snapshot() {
+    StatsIterator values = latestValues();
+    Map<String, Long> snapshot = new HashMap<>(
+        stats == null ? 0 : stats.size());
+    for (StorageStatistics.LongStatistic value : values) {
+      snapshot.put(value.getName(), value.getValue());
+    }
+    return snapshot;
+  }
+
+  public StatsIterator latestValues() {
+    return new StatsIterator(fs.getStorageStatistics());
+  }
+
+  /**
+   * Provide an iterator to the stats.
+   */
+  public static class StatsIterator implements
+      Iterable<StorageStatistics.LongStatistic> {
+    private final StorageStatistics statistics;
+
+    public StatsIterator(StorageStatistics statistics) {
+      this.statistics = statistics;
+    }
+
+    @Override
+    public Iterator<StorageStatistics.LongStatistic> iterator() {
+      return statistics.getLongStatistics();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java
new file mode 100644
index 0000000..d29e2df
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInvoker.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkBaseException;
+import 
com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.ConnectTimeoutException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.Invoker.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test the {@link Invoker} code and the associated {@link S3ARetryPolicy}.
+ *
+ * Some of the tests look at how Connection Timeout Exceptions are processed.
+ * Because of how the AWS libraries shade the classes, there have been some
+ * regressions here during development. These tests are intended to verify that
+ * the current match process based on classname works.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class TestInvoker extends Assert {
+
+  /** Configuration to use for short retry intervals. */
+  private static final Configuration FAST_RETRY_CONF;
+  private static final ConnectTimeoutException
+      HADOOP_CONNECTION_TIMEOUT_EX = new ConnectTimeoutException("hadoop");
+  private static final Local.ConnectTimeoutException
+      LOCAL_CONNECTION_TIMEOUT_EX
+      = new Local.ConnectTimeoutException("local");
+  private static final org.apache.http.conn.ConnectTimeoutException
+      HTTP_CONNECTION_TIMEOUT_EX
+      = new org.apache.http.conn.ConnectTimeoutException("apache");
+  private static final SocketTimeoutException SOCKET_TIMEOUT_EX
+      = new SocketTimeoutException("socket");
+
+  /**
+   * What retry limit to use.
+   */
+  private static final int ACTIVE_RETRY_LIMIT = RETRY_LIMIT_DEFAULT;
+
+  /**
+   * A retry count guaranteed to be out of range.
+   */
+  private static final int RETRIES_TOO_MANY = ACTIVE_RETRY_LIMIT + 10;
+
+  /**
+   * A count of retry attempts guaranteed to be within the permitted range.
+   */
+  public static final int SAFE_RETRY_COUNT = 5;
+
+  static {
+    FAST_RETRY_CONF = new Configuration();
+    String interval = "10ms";
+    FAST_RETRY_CONF.set(RETRY_INTERVAL, interval);
+    FAST_RETRY_CONF.set(RETRY_THROTTLE_INTERVAL, interval);
+    FAST_RETRY_CONF.setInt(RETRY_LIMIT, ACTIVE_RETRY_LIMIT);
+    FAST_RETRY_CONF.setInt(RETRY_THROTTLE_LIMIT, ACTIVE_RETRY_LIMIT);
+  }
+
+  private static final S3ARetryPolicy RETRY_POLICY =
+      new S3ARetryPolicy(FAST_RETRY_CONF);
+
+  private int retryCount;
+  private Invoker invoker = new Invoker(RETRY_POLICY,
+      (text, e, retries, idempotent) -> retryCount++);
+  private static final AmazonClientException CLIENT_TIMEOUT_EXCEPTION =
+      new AmazonClientException(new Local.ConnectTimeoutException("timeout"));
+  private static final AmazonServiceException BAD_REQUEST = serviceException(
+      AWSBadRequestException.STATUS_CODE,
+      "bad request");
+
+  @Before
+  public void setup() {
+    resetCounters();
+  }
+
+  private static AmazonServiceException serviceException(int code,
+      String text) {
+    AmazonServiceException ex = new AmazonServiceException(text);
+    ex.setStatusCode(code);
+    return ex;
+  }
+
+  private static AmazonS3Exception createS3Exception(int code) {
+    return createS3Exception(code, "", null);
+  }
+
+  private static AmazonS3Exception createS3Exception(int code,
+      String message,
+      Throwable inner) {
+    AmazonS3Exception ex = new AmazonS3Exception(message);
+    ex.setStatusCode(code);
+    ex.initCause(inner);
+    return ex;
+  }
+
+  protected <E extends Throwable> void verifyTranslated(
+      int status,
+      Class<E> expected) throws Exception {
+    verifyTranslated(expected, createS3Exception(status));
+  }
+
+  private static <E extends Throwable> E verifyTranslated(Class<E> clazz,
+      SdkBaseException exception) throws Exception {
+    return verifyExceptionClass(clazz,
+        translateException("test", "/", exception));
+  }
+
+  private void resetCounters() {
+    retryCount = 0;
+  }
+
+  @Test
+  public void test503isThrottled() throws Exception {
+    verifyTranslated(503, AWSServiceThrottledException.class);
+  }
+
+  @Test
+  public void testS3500isStatus500Exception() throws Exception {
+    verifyTranslated(500, AWSStatus500Exception.class);
+  }
+
+  @Test
+  public void test500isStatus500Exception() throws Exception {
+    AmazonServiceException ex = new AmazonServiceException("");
+    ex.setStatusCode(500);
+    verifyTranslated(AWSStatus500Exception.class,
+        ex);
+  }
+
+  @Test(expected = org.apache.hadoop.net.ConnectTimeoutException.class)
+  public void testExtractConnectTimeoutException() throws Throwable {
+    throw extractException("", "",
+        new ExecutionException(
+            new AmazonClientException(LOCAL_CONNECTION_TIMEOUT_EX)));
+  }
+
+  @Test(expected = SocketTimeoutException.class)
+  public void testExtractSocketTimeoutException() throws Throwable {
+    throw extractException("", "",
+        new ExecutionException(
+            new AmazonClientException(SOCKET_TIMEOUT_EX)));
+  }
+
+  /**
+   * Make an assertion about a retry policy.
+   * @param text text for error message
+   * @param policy policy to check against
+   * @param expected expected action
+   * @param ex exception to analyze
+   * @param retries number of times there has already been a retry
+   * @param idempotent whether or not the operation is idempotent
+   * @throws Exception if the retry policy raises it
+   * @throws AssertionError if the returned action was not that expected.
+   */
+  private void assertRetryAction(String text,
+      S3ARetryPolicy policy,
+      RetryPolicy.RetryAction expected,
+      Exception ex,
+      int retries,
+      boolean idempotent) throws Exception {
+    RetryPolicy.RetryAction outcome = policy.shouldRetry(ex, retries, 0,
+        idempotent);
+    if (!expected.action.equals(outcome.action)) {
+      throw new AssertionError(
+          String.format(
+              "%s Expected action %s from shouldRetry(%s, %s, %s), but got"
+                  + " %s",
+              text,
+              expected, ex.toString(), retries, idempotent,
+              outcome.action),
+          ex);
+    }
+  }
+
+  @Test
+  public void testRetryThrottled() throws Throwable {
+    S3ARetryPolicy policy = RETRY_POLICY;
+    IOException ex = translateException("GET", "/", newThrottledException());
+
+    assertRetryAction("Expected retry on first throttle",
+        policy, RetryPolicy.RetryAction.RETRY,
+        ex, 0, true);
+    int retries = SAFE_RETRY_COUNT;
+    assertRetryAction("Expected retry on repeated throttle",
+        policy, RetryPolicy.RetryAction.RETRY,
+        ex, retries, true);
+    assertRetryAction("Expected retry on non-idempotent throttle",
+        policy, RetryPolicy.RetryAction.RETRY,
+        ex, retries, false);
+  }
+
+  @Test
+  public void testRetryThrottledDDB() throws Throwable {
+    assertRetryAction("Expected retry on connection timeout",
+        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
+        new ProvisionedThroughputExceededException("IOPs"), 1, false);
+  }
+
+
+  protected AmazonServiceException newThrottledException() {
+    return serviceException(
+        AWSServiceThrottledException.STATUS_CODE, "throttled");
+  }
+
+  /**
+   * Repeatedly retry until a throttle eventually stops being raised.
+   */
+  @Test
+  public void testRetryOnThrottle() throws Throwable {
+
+    final AtomicInteger counter = new AtomicInteger(0);
+    invoker.retry("test", null, false,
+        () -> {
+          if (counter.incrementAndGet() < 5) {
+            throw newThrottledException();
+          }
+        });
+  }
+
+  /**
+   * Non-idempotent operations fail on anything which isn't a throttle
+   * or connectivity problem.
+   */
+  @Test(expected = AWSBadRequestException.class)
+  public void testNoRetryOfBadRequestNonIdempotent() throws Throwable {
+    invoker.retry("test", null, false,
+        () -> {
+          throw serviceException(400, "bad request");
+        });
+  }
+
+  /**
+   * AWS nested socket problems.
+   */
+  @Test
+  public void testRetryAWSConnectivity() throws Throwable {
+    final AtomicInteger counter = new AtomicInteger(0);
+    invoker.retry("test", null, false,
+        () -> {
+          if (counter.incrementAndGet() < ACTIVE_RETRY_LIMIT) {
+            throw CLIENT_TIMEOUT_EXCEPTION;
+          }
+        });
+    assertEquals(ACTIVE_RETRY_LIMIT, counter.get());
+  }
+
+  /**
+   * Repeatedly retry until eventually a bad request succeeds.
+   */
+  @Test
+  public void testRetryBadRequestIdempotent() throws Throwable {
+    final AtomicInteger counter = new AtomicInteger(0);
+    final int attemptsBeforeSuccess = ACTIVE_RETRY_LIMIT;
+    invoker.retry("test", null, true,
+        () -> {
+          if (counter.incrementAndGet() < attemptsBeforeSuccess) {
+            throw BAD_REQUEST;
+          }
+        });
+    assertEquals(attemptsBeforeSuccess, counter.get());
+    assertEquals("retry count ", attemptsBeforeSuccess - 1, retryCount);
+  }
+
+  @Test
+  public void testConnectionRetryPolicyIdempotent() throws Throwable {
+    assertRetryAction("Expected retry on connection timeout",
+        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
+        HADOOP_CONNECTION_TIMEOUT_EX, 1, true);
+    assertRetryAction("Expected connection timeout failure",
+        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
+        HADOOP_CONNECTION_TIMEOUT_EX, RETRIES_TOO_MANY, true);
+  }
+
+  /**
+   * Even on a non-idempotent call, connection failures are considered
+   * retryable.
+   */
+  @Test
+  public void testConnectionRetryPolicyNonIdempotent() throws Throwable {
+    assertRetryAction("Expected retry on connection timeout",
+        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
+        HADOOP_CONNECTION_TIMEOUT_EX, 1, false);
+  }
+
+  /**
+   * Interrupted IOEs are not retryable.
+   */
+  @Test
+  public void testInterruptedIOExceptionRetry() throws Throwable {
+    assertRetryAction("Expected retry on connection timeout",
+        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
+        new InterruptedIOException("interrupted"), 1, false);
+  }
+
+  @Test
+  public void testUnshadedConnectionTimeoutExceptionMatching()
+      throws Throwable {
+    // connection timeout exceptions are special, but as AWS shades
+    // theirs, we need to string match them
+    verifyTranslated(ConnectTimeoutException.class,
+        new AmazonClientException(HTTP_CONNECTION_TIMEOUT_EX));
+  }
+
+  @Test
+  public void testShadedConnectionTimeoutExceptionMatching() throws Throwable {
+    // connection timeout exceptions are special, but as AWS shades
+    // theirs, we need to string match them
+    verifyTranslated(ConnectTimeoutException.class,
+        new AmazonClientException(LOCAL_CONNECTION_TIMEOUT_EX));
+  }
+
+  @Test
+  public void testShadedConnectionTimeoutExceptionNotMatching()
+      throws Throwable {
+    InterruptedIOException ex = verifyTranslated(InterruptedIOException.class,
+        new AmazonClientException(new Local.NotAConnectTimeoutException()));
+    if (ex instanceof ConnectTimeoutException) {
+      throw ex;
+    }
+  }
+
+  /**
+   * Test that NPEs aren't retried. Also verify that the
+   * catch counts are incremented, while the retry count isn't.
+   */
+  @Test
+  public void testNPEsNotRetried() throws Throwable {
+    assertRetryAction("Expected NPE trigger failure",
+        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
+        new NullPointerException("oops"), 1, true);
+    // catch notification didn't see it
+    assertEquals("retry count ", 0, retryCount);
+  }
+
+  /**
+   * Container for the local exceptions, to help keep visible which
+   * specific class of exception.
+   */
+  private static class Local {
+
+    /**
+     * A local exception with a name to match the expected one.
+     */
+    private static class ConnectTimeoutException
+        extends InterruptedIOException {
+      ConnectTimeoutException(String s) {
+        super(s);
+      }
+    }
+
+    /**
+     * A local exception whose name should not match.
+     */
+    private static class NotAConnectTimeoutException
+        extends InterruptedIOException {
+
+    }
+
+  }
+
+  @Test
+  public void testQuietlyVoid() {
+    quietlyEval("", "",
+        () -> {
+        throw HADOOP_CONNECTION_TIMEOUT_EX;
+      });
+  }
+
+  @Test
+  public void testQuietlyEvalReturnValueSuccess() {
+    assertOptionalEquals("quietly", 3,
+        quietlyEval("", "", () -> 3));
+  }
+
+  @Test
+  public void testQuietlyEvalReturnValueFail() {
+    // use a variable so IDEs don't warn of numeric overflows
+    int d = 0;
+    assertOptionalUnset("quietly",
+        quietlyEval("", "", () -> 3 / d));
+  }
+
+  @Test
+  public void testDynamoDBThrottleConversion() throws Throwable {
+    ProvisionedThroughputExceededException exceededException =
+        new ProvisionedThroughputExceededException("iops");
+    AWSServiceThrottledException ddb = verifyTranslated(
+        AWSServiceThrottledException.class, exceededException);
+    assertTrue(isThrottleException(exceededException));
+    assertTrue(isThrottleException(ddb));
+    assertRetryAction("Expected throttling retry",
+        RETRY_POLICY,
+        RetryPolicy.RetryAction.RETRY,
+        ddb, SAFE_RETRY_COUNT, false);
+    // and briefly attempt an operation
+    CatchCallback catcher = new CatchCallback();
+    AtomicBoolean invoked = new AtomicBoolean(false);
+    invoker.retry("test", null, false, catcher,
+        () -> {
+          if (!invoked.getAndSet(true)) {
+            throw exceededException;
+          }
+        });
+    // to verify that the ex was translated by the time it
+    // got to the callback
+    verifyExceptionClass(AWSServiceThrottledException.class,
+        catcher.lastException);
+  }
+
+  /**
+   * Catch the exception and preserve it for later queries.
+   */
+  private static final class CatchCallback implements Retried {
+    private IOException lastException;
+    @Override
+    public void onFailure(String text,
+        IOException exception,
+        int retries,
+        boolean idempotent) {
+      lastException = exception;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
index e647327..39a5e3b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
+import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
 import static org.apache.hadoop.fs.s3a.Listing.ProvidedFileStatusIterator;
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
index e548ac2..9b86595 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
@@ -27,6 +27,7 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
 import java.nio.file.AccessDeniedException;
 import java.util.Collections;
 import java.util.Map;
@@ -38,58 +39,87 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
 
 import org.junit.Test;
 
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+
 /**
- * Unit test suite covering translation of AWS SDK exceptions to S3A 
exceptions.
+ * Unit test suite covering translation of AWS SDK exceptions to S3A 
exceptions,
+ * and retry/recovery policies.
  */
+@SuppressWarnings("ThrowableNotThrown")
 public class TestS3AExceptionTranslation {
 
+  private static final org.apache.http.conn.ConnectTimeoutException
+      HTTP_CONNECTION_TIMEOUT_EX
+      = new org.apache.http.conn.ConnectTimeoutException("apache");
+  private static final SocketTimeoutException SOCKET_TIMEOUT_EX
+      = new SocketTimeoutException("socket");
+
   @Test
   public void test301ContainsEndpoint() throws Exception {
-    AmazonS3Exception s3Exception = createS3Exception("wrong endpoint", 301,
+    String bucket = "bucket.s3-us-west-2.amazonaws.com";
+    int sc301 = 301;
+    AmazonS3Exception s3Exception = createS3Exception("wrong endpoint", sc301,
         Collections.singletonMap(S3AUtils.ENDPOINT_KEY,
-            "bucket.s3-us-west-2.amazonaws.com"));
-    AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
-        AWSS3IOException.class, s3Exception);
-    assertEquals(301, ex.getStatusCode());
+            bucket));
+    AWSRedirectException ex = verifyTranslated(
+        AWSRedirectException.class, s3Exception);
+    assertStatusCode(sc301, ex);
     assertNotNull(ex.getMessage());
-    assertTrue(ex.getMessage().contains("bucket.s3-us-west-2.amazonaws.com"));
-    assertTrue(ex.getMessage().contains(ENDPOINT));
+
+    assertContained(ex.getMessage(), bucket);
+    assertContained(ex.getMessage(), ENDPOINT);
+    assertExceptionContains(ENDPOINT, ex, "endpoint");
+    assertExceptionContains(bucket, ex, "bucket name");
+  }
+
+  protected void assertContained(String text, String contained) {
+    assertTrue("string \""+ contained + "\" not found in \"" + text + "\"",
+        text != null && text.contains(contained));
+  }
+
+  protected <E extends Throwable> void verifyTranslated(
+      int status,
+      Class<E> expected) throws Exception {
+    verifyTranslated(expected, createS3Exception(status));
+  }
+
+  @Test
+  public void test400isBad() throws Exception {
+    verifyTranslated(400, AWSBadRequestException.class);
   }
 
   @Test
   public void test401isNotPermittedFound() throws Exception {
-    verifyTranslated(AccessDeniedException.class,
-        createS3Exception(401));
+    verifyTranslated(401, AccessDeniedException.class);
   }
 
   @Test
   public void test403isNotPermittedFound() throws Exception {
-    verifyTranslated(AccessDeniedException.class,
-        createS3Exception(403));
+    verifyTranslated(403, AccessDeniedException.class);
   }
 
   @Test
   public void test404isNotFound() throws Exception {
-    verifyTranslated(FileNotFoundException.class, createS3Exception(404));
+    verifyTranslated(404, FileNotFoundException.class);
   }
 
   @Test
   public void test410isNotFound() throws Exception {
-    verifyTranslated(FileNotFoundException.class, createS3Exception(410));
+    verifyTranslated(410, FileNotFoundException.class);
   }
 
   @Test
   public void test416isEOF() throws Exception {
-    verifyTranslated(EOFException.class, createS3Exception(416));
+    verifyTranslated(416, EOFException.class);
   }
 
   @Test
   public void testGenericS3Exception() throws Exception {
     // S3 exception of no known type
-    AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
+    AWSS3IOException ex = verifyTranslated(
         AWSS3IOException.class,
         createS3Exception(451));
-    assertEquals(451, ex.getStatusCode());
+    assertStatusCode(451, ex);
   }
 
   @Test
@@ -97,10 +127,19 @@ public class TestS3AExceptionTranslation {
     // service exception of no known type
     AmazonServiceException ase = new AmazonServiceException("unwind");
     ase.setStatusCode(500);
-    AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
-        AWSServiceIOException.class,
+    AWSServiceIOException ex = verifyTranslated(
+        AWSStatus500Exception.class,
         ase);
-    assertEquals(500, ex.getStatusCode());
+    assertStatusCode(500, ex);
+  }
+
+  protected void assertStatusCode(int expected, AWSServiceIOException ex) {
+    assertNotNull("Null exception", ex);
+    if (expected != ex.getStatusCode()) {
+      throw new AssertionError("Expected status code " + expected
+          + "but got " + ex.getStatusCode(),
+          ex);
+    }
   }
 
   @Test
@@ -122,7 +161,7 @@ public class TestS3AExceptionTranslation {
     return source;
   }
 
-  private static Exception verifyTranslated(Class clazz,
+  private static <E extends Throwable> E verifyTranslated(Class<E> clazz,
       AmazonClientException exception) throws Exception {
     return verifyExceptionClass(clazz,
         translateException("test", "/", exception));
@@ -130,7 +169,8 @@ public class TestS3AExceptionTranslation {
 
   private void assertContainsInterrupted(boolean expected, Throwable thrown)
       throws Throwable {
-    if (containsInterruptedException(thrown) != expected) {
+    boolean wasInterrupted = containsInterruptedException(thrown) != null;
+    if (wasInterrupted != expected) {
       throw thrown;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
new file mode 100644
index 0000000..267d4df
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.amazonaws.services.s3.AmazonS3;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Base test suite for committer operations.
+ *
+ * By default, these tests enable the inconsistent committer, with
+ * a delay of {@link #CONSISTENCY_DELAY}; they may also have throttling
+ * enabled/disabled.
+ *
+ * <b>Important:</b> all filesystem probes will have to wait for
+ * the FS inconsistency delays and handle things like throttle exceptions,
+ * or disable throttling and fault injection before the probe.
+ *
+ */
+public abstract class AbstractCommitITest extends AbstractS3ATestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractCommitITest.class);
+
+  protected static final int CONSISTENCY_DELAY = 500;
+  protected static final int CONSISTENCY_PROBE_INTERVAL = 500;
+  protected static final int CONSISTENCY_WAIT = CONSISTENCY_DELAY * 2;
+
+  private InconsistentAmazonS3Client inconsistentClient;
+
+  /**
+   * Should the inconsistent S3A client be used?
+   * Default value: true.
+   * @return true for inconsistent listing
+   */
+  public boolean useInconsistentClient() {
+    return true;
+  }
+
+  /**
+   * switch to an inconsistent path if in inconsistent mode.
+   * {@inheritDoc}
+   */
+  @Override
+  protected Path path(String filepath) throws IOException {
+    return useInconsistentClient() ?
+           super.path(InconsistentAmazonS3Client.DEFAULT_DELAY_KEY_SUBSTRING
+               + "/" + filepath)
+           : super.path(filepath);
+  }
+
+  /**
+   * Creates a configuration for commit operations: commit is enabled in the FS
+   * and output is multipart to on-heap arrays.
+   * @return a configuration to use when creating an FS.
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+    conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+    conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+    conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
+    if (useInconsistentClient()) {
+      enableInconsistentS3Client(conf, CONSISTENCY_DELAY);
+    }
+    return conf;
+  }
+
+  /**
+   * Get the log; can be overridden for test case log.
+   * @return a log.
+   */
+  public Logger log() {
+    return LOG;
+  }
+
+  /***
+   * Bind to the named committer.
+   *
+   * @param conf configuration
+   * @param factory factory name
+   * @param committerName committer; set if non null/empty
+   */
+  protected void bindCommitter(Configuration conf, String factory,
+      String committerName) {
+    conf.set(S3A_COMMITTER_FACTORY_KEY, factory);
+    if (StringUtils.isNotEmpty(committerName)) {
+      conf.set(FS_S3A_COMMITTER_NAME, committerName);
+    }
+  }
+
+  /**
+   * Clean up a directory.
+   * Waits for consistency if needed
+   * @param dir directory
+   * @param conf configuration
+   * @throws IOException failure
+   */
+  public void rmdir(Path dir, Configuration conf) throws IOException {
+    if (dir != null) {
+      describe("deleting %s", dir);
+      FileSystem fs = dir.getFileSystem(conf);
+      waitForConsistency();
+      fs.delete(dir, true);
+      waitForConsistency();
+    }
+  }
+
+  /**
+   * Setup will use inconsistent client if {@link #useInconsistentClient()}
+   * is true.
+   * @throws Exception failure.
+   */
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    if (useInconsistentClient()) {
+      AmazonS3 client = getFileSystem()
+          .getAmazonS3ClientForTesting("fault injection");
+      Assert.assertTrue(
+          "AWS client is not inconsistent, even though the test requirees it "
+          + client,
+          client instanceof InconsistentAmazonS3Client);
+      inconsistentClient = (InconsistentAmazonS3Client) client;
+    }
+  }
+
+  /**
+   * Teardown waits for the consistency delay and resets failure count, so
+   * FS is stable, before the superclass teardown is called. This
+   * should clean things up better.
+   * @throws Exception failure.
+   */
+  @Override
+  public void teardown() throws Exception {
+    waitForConsistency();
+    // make sure there are no failures any more
+    resetFailures();
+    super.teardown();
+  }
+
+  /**
+   * Wait a multiple of the inconsistency delay for things to stabilize;
+   * no-op if the consistent client is used.
+   * @throws InterruptedIOException if the sleep is interrupted
+   */
+  protected void waitForConsistency() throws InterruptedIOException {
+    if (useInconsistentClient() && inconsistentClient != null) {
+      try {
+        Thread.sleep(2* inconsistentClient.getDelayKeyMsec());
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException)
+            (new InterruptedIOException("while waiting for consistency: " + e)
+                .initCause(e));
+      }
+    }
+  }
+
+  /**
+   * Set the throttling factor on requests.
+   * @param p probability of a throttling occurring: 0-1.0
+   */
+  protected void setThrottling(float p) {
+    inconsistentClient.setThrottleProbability(p);
+  }
+
+  /**
+   * Set the throttling factor on requests and number of calls to throttle.
+   * @param p probability of a throttling occurring: 0-1.0
+   * @param limit limit to number of calls which fail
+   */
+  protected void setThrottling(float p, int limit) {
+    inconsistentClient.setThrottleProbability(p);
+    setFailureLimit(limit);
+  }
+
+  /**
+   * Turn off throttling.
+   */
+  protected void resetFailures() {
+    if (inconsistentClient != null) {
+      setThrottling(0, 0);
+    }
+  }
+
+  /**
+   * Set failure limit.
+   * @param limit limit to number of calls which fail
+   */
+  private void setFailureLimit(int limit) {
+    inconsistentClient.setFailureLimit(limit);
+  }
+
+  /**
+   * Abort all multipart uploads under a path.
+   * @param path path for uploads to abort; may be null
+   * @return a count of aborts
+   * @throws IOException trouble.
+   */
+  protected int abortMultipartUploadsUnderPath(Path path) throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    if (fs != null && path != null) {
+      String key = fs.pathToKey(path);
+      WriteOperationHelper writeOps = fs.createWriteOperationHelper();
+      int count = writeOps.abortMultipartUploadsUnderPath(key);
+      if (count > 0) {
+        log().info("Multipart uploads deleted: {}", count);
+      }
+      return count;
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Assert that there *are* pending MPUs.
+   * @param path path to look under
+   * @throws IOException IO failure
+   */
+  protected void assertMultipartUploadsPending(Path path) throws IOException {
+    assertTrue("No multipart uploads in progress under " + path,
+        countMultipartUploads(path) > 0);
+  }
+
+  /**
+   * Assert that there *are no* pending MPUs; assertion failure will include
+   * the list of pending writes.
+   * @param path path to look under
+   * @throws IOException IO failure
+   */
+  protected void assertNoMultipartUploadsPending(Path path) throws IOException 
{
+    List<String> uploads = listMultipartUploads(getFileSystem(),
+        pathToPrefix(path));
+    if (!uploads.isEmpty()) {
+      String result = uploads.stream().collect(Collectors.joining("\n"));
+      fail("Multipart uploads in progress under " + path + " \n" + result);
+    }
+  }
+
+  /**
+   * Count the number of MPUs under a path.
+   * @param path path to scan
+   * @return count
+   * @throws IOException IO failure
+   */
+  protected int countMultipartUploads(Path path) throws IOException {
+    return countMultipartUploads(pathToPrefix(path));
+  }
+
+  /**
+   * Count the number of MPUs under a prefix; also logs them.
+   * @param prefix prefix to scan
+   * @return count
+   * @throws IOException IO failure
+   */
+  protected int countMultipartUploads(String prefix) throws IOException {
+    return listMultipartUploads(getFileSystem(), prefix).size();
+  }
+
+  /**
+   * Map from a path to a prefix.
+   * @param path path
+   * @return the key
+   */
+  private String pathToPrefix(Path path) {
+    return path == null ? "" :
+        getFileSystem().pathToKey(path);
+  }
+
+  /**
+   * Verify that the specified dir has the {@code _SUCCESS} marker
+   * and that it can be loaded.
+   * The contents will be logged and returned.
+   * @param dir directory to scan
+   * @return the loaded success data
+   * @throws IOException IO Failure
+   */
+  protected SuccessData verifySuccessMarker(Path dir) throws IOException {
+    assertPathExists("Success marker",
+        new Path(dir, _SUCCESS));
+    SuccessData successData = loadSuccessMarker(dir);
+    log().info("Success data {}", successData.toString());
+    log().info("Metrics\n{}",
+        successData.dumpMetrics("  ", " = ", "\n"));
+    log().info("Diagnostics\n{}",
+        successData.dumpDiagnostics("  ", " = ", "\n"));
+    return successData;
+  }
+
+  /**
+   * Load the success marker and return the data inside it.
+   * @param dir directory containing the marker
+   * @return the loaded data
+   * @throws IOException on any failure to load or validate the data
+   */
+  protected SuccessData loadSuccessMarker(Path dir) throws IOException {
+    return SuccessData.load(getFileSystem(), new Path(dir, _SUCCESS));
+  }
+
+  /**
+   * Read a UTF-8 file.
+   * @param path path to read
+   * @return string value
+   * @throws IOException IO failure
+   */
+  protected String readFile(Path path) throws IOException {
+    return ContractTestUtils.readUTF8(getFileSystem(), path, -1);
+  }
+
+  /**
+   * Assert that the given dir does not have the {@code _SUCCESS} marker.
+   * @param dir dir to scan
+   * @throws IOException IO Failure
+   */
+  protected void assertSuccessMarkerDoesNotExist(Path dir) throws IOException {
+    assertPathDoesNotExist("Success marker",
+        new Path(dir, _SUCCESS));
+  }
+
+  /**
+   * Closeable which can be used to safely close writers in
+   * a try-with-resources block..
+   */
+  protected static class CloseWriter implements AutoCloseable{
+
+    private final RecordWriter writer;
+    private final TaskAttemptContext context;
+
+    public CloseWriter(RecordWriter writer,
+        TaskAttemptContext context) {
+      this.writer = writer;
+      this.context = context;
+    }
+
+    @Override
+    public void close() {
+      try {
+        writer.close(context);
+      } catch (IOException | InterruptedException e) {
+        LOG.error("When closing {} on context {}",
+            writer, context, e);
+
+      }
+    }
+  }
+
+  /**
+   * Create a task attempt for a Job. This is based on the code
+   * run in the MR AM, creating a task (0) for the job, then a task
+   * attempt (0).
+   * @param jobId job ID
+   * @param jContext job context
+   * @return the task attempt.
+   */
+  public static TaskAttemptContext taskAttemptForJob(JobId jobId,
+      JobContext jContext) {
+    org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+        MRBuilderUtils.newTaskId(jobId, 0,
+            org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP);
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        MRBuilderUtils.newTaskAttemptId(taskID, 0);
+    return new TaskAttemptContextImpl(
+        jContext.getConfiguration(),
+        TypeConverter.fromYarn(attemptID));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
new file mode 100644
index 0000000..13dfd83
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.collect.Sets;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.service.ServiceOperations;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+
+/** Full integration test of an MR job. */
+public abstract class AbstractITCommitMRJob extends AbstractCommitITest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractITCommitMRJob.class);
+
+  private static final int TEST_FILE_COUNT = 2;
+  private static final int SCALE_TEST_FILE_COUNT = 20;
+
+  private static MiniDFSClusterService hdfs;
+  private static MiniMRYarnCluster yarn = null;
+  private static JobConf conf = null;
+  private boolean uniqueFilenames = false;
+  private boolean scaleTest;
+
+  protected static FileSystem getDFS() {
+    return hdfs.getClusterFS();
+  }
+
+  @BeforeClass
+  public static void setupClusters() throws IOException {
+    // the HDFS and YARN clusters share the same configuration, so
+    // the HDFS cluster binding is implicitly propagated to YARN
+    conf = new JobConf();
+    conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+    conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
+
+    hdfs = new MiniDFSClusterService();
+    hdfs.init(conf);
+    hdfs.start();
+    yarn = new MiniMRYarnCluster("ITCommitMRJob", 2);
+    yarn.init(conf);
+    yarn.start();
+  }
+
+  @SuppressWarnings("ThrowableNotThrown")
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    conf = null;
+    ServiceOperations.stopQuietly(yarn);
+    ServiceOperations.stopQuietly(hdfs);
+    hdfs = null;
+    yarn = null;
+  }
+
+  public static MiniDFSCluster getHdfs() {
+    return hdfs.getCluster();
+  }
+
+  public static FileSystem getLocalFS() {
+    return hdfs.getLocalFS();
+  }
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  /**
+   * The name of the committer as returned by
+   * {@link AbstractS3ACommitter#getName()} and used for committer 
construction.
+   */
+  protected abstract String committerName();
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    scaleTest = getTestPropertyBool(
+        getConfiguration(),
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED);
+  }
+
+  @Override
+  protected int getTestTimeoutMillis() {
+    return SCALE_TEST_TIMEOUT_SECONDS * 1000;
+  }
+
+  @Test
+  public void testMRJob() throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    // final dest is in S3A
+    Path outputPath = path("testMRJob");
+
+    String commitUUID = UUID.randomUUID().toString();
+    String suffix = uniqueFilenames ? ("-" + commitUUID) : "";
+    int numFiles = getTestFileCount();
+    List<String> expectedFiles = new ArrayList<>(numFiles);
+    Set<String> expectedKeys = Sets.newHashSet();
+    for (int i = 0; i < numFiles; i += 1) {
+      File file = temp.newFile(String.valueOf(i) + ".text");
+      try (FileOutputStream out = new FileOutputStream(file)) {
+        out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
+      }
+      String filename = String.format("part-m-%05d%s", i, suffix);
+      Path path = new Path(outputPath, filename);
+      expectedFiles.add(path.toString());
+      expectedKeys.add("/" + fs.pathToKey(path));
+    }
+    Collections.sort(expectedFiles);
+
+    Job mrJob = Job.getInstance(yarn.getConfig(), "test-committer-job");
+    JobConf jobConf = (JobConf) mrJob.getConfiguration();
+    jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+        uniqueFilenames);
+
+
+    bindCommitter(jobConf,
+        CommitConstants.S3A_COMMITTER_FACTORY,
+        committerName());
+    // pass down the scale test flag
+    jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
+
+    mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
+    FileOutputFormat.setOutputPath(mrJob, outputPath);
+
+    File mockResultsFile = temp.newFile("committer.bin");
+    mockResultsFile.delete();
+    String committerPath = "file:" + mockResultsFile;
+    jobConf.set("mock-results-file", committerPath);
+    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
+
+    mrJob.setInputFormatClass(TextInputFormat.class);
+    FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI()));
+
+    mrJob.setMapperClass(MapClass.class);
+    mrJob.setNumReduceTasks(0);
+
+    // an attempt to set up log4j properly, which clearly doesn't work
+    URL log4j = getClass().getClassLoader().getResource("log4j.properties");
+    if (log4j != null && log4j.getProtocol().equals("file")) {
+      Path log4jPath = new Path(log4j.toURI());
+      LOG.debug("Using log4j path {}", log4jPath);
+      mrJob.addFileToClassPath(log4jPath);
+      String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s",
+          log4j);
+      jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
+      jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
+    }
+
+    applyCustomConfigOptions(jobConf);
+    // fail fast if anything goes wrong
+    mrJob.setMaxMapAttempts(1);
+
+    mrJob.submit();
+    try (DurationInfo d = new DurationInfo(LOG, "Job Execution")) {
+      boolean succeeded = mrJob.waitForCompletion(true);
+      assertTrue("MR job failed", succeeded);
+    }
+
+    waitForConsistency();
+    assertIsDirectory(outputPath);
+    FileStatus[] results = fs.listStatus(outputPath,
+        S3AUtils.HIDDEN_FILE_FILTER);
+    int fileCount = results.length;
+    List<String> actualFiles = new ArrayList<>(fileCount);
+    assertTrue("No files in output directory", fileCount != 0);
+    LOG.info("Found {} files", fileCount);
+    for (FileStatus result : results) {
+      LOG.debug("result: {}", result);
+      actualFiles.add(result.getPath().toString());
+    }
+    Collections.sort(actualFiles);
+
+    // load in the success data marker: this guarantees that a s3guard
+    // committer was used
+    Path success = new Path(outputPath, _SUCCESS);
+    FileStatus status = fs.getFileStatus(success);
+    assertTrue("0 byte success file - not a s3guard committer " + success,
+        status.getLen() > 0);
+    SuccessData successData = SuccessData.load(fs, success);
+    String commitDetails = successData.toString();
+    LOG.info("Committer name " + committerName() + "\n{}",
+        commitDetails);
+    LOG.info("Committer statistics: \n{}",
+        successData.dumpMetrics("  ", " = ", "\n"));
+    LOG.info("Diagnostics\n{}",
+        successData.dumpDiagnostics("  ", " = ", "\n"));
+    assertEquals("Wrong committer in " + commitDetails,
+        committerName(), successData.getCommitter());
+    List<String> successFiles = successData.getFilenames();
+    assertTrue("No filenames in " + commitDetails,
+        !successFiles.isEmpty());
+
+    assertEquals("Should commit the expected files",
+        expectedFiles, actualFiles);
+
+    Set<String> summaryKeys = Sets.newHashSet();
+    summaryKeys.addAll(successFiles);
+    assertEquals("Summary keyset doesn't list the the expected paths "
+        + commitDetails, expectedKeys, summaryKeys);
+    assertPathDoesNotExist("temporary dir",
+        new Path(outputPath, CommitConstants.TEMPORARY));
+    customPostExecutionValidation(outputPath, successData);
+  }
+
+  /**
+   * Get the file count for the test.
+   * @return the number of mappers to create.
+   */
+  public int getTestFileCount() {
+    return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
+  }
+
+  /**
+   * Override point to let implementations tune the MR Job conf.
+   * @param c configuration
+   */
+  protected void applyCustomConfigOptions(Configuration c) {
+
+  }
+
+  /**
+   * Override point for any committer specific validation operations;
+   * called after the base assertions have all passed.
+   * @param destPath destination of work
+   * @param successData loaded success data
+   * @throws Exception failure
+   */
+  protected void customPostExecutionValidation(Path destPath,
+      SuccessData successData)
+      throws Exception {
+
+  }
+
+  /**
+   *  Test Mapper.
+   *  This is executed in separate process, and must not make any assumptions
+   *  about external state.
+   */
+  public static class MapClass
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    private int operations;
+    private String id = "";
+    private LongWritable l = new LongWritable();
+    private Text t = new Text();
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      super.setup(context);
+      // force in Log4J logging
+      org.apache.log4j.BasicConfigurator.configure();
+      boolean scaleMap = context.getConfiguration()
+          .getBoolean(KEY_SCALE_TESTS_ENABLED, false);
+      operations = scaleMap ? 1000 : 10;
+      id = context.getTaskAttemptID().toString();
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      for (int i = 0; i < operations; i++) {
+        l.set(i);
+        t.set(String.format("%s:%05d", id, i));
+        context.write(l, t);
+      }
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to