http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java
new file mode 100644
index 0000000..47d112d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitPaths.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Tests for {@link MagicCommitPaths} path operations.
+ */
+public class TestMagicCommitPaths extends Assert {
+
+  private static final List<String> MAGIC_AT_ROOT =
+      list(MAGIC);
+  private static final List<String> MAGIC_AT_ROOT_WITH_CHILD =
+      list(MAGIC, "child");
+  private static final List<String> MAGIC_WITH_CHILD =
+      list("parent", MAGIC, "child");
+  private static final List<String> MAGIC_AT_WITHOUT_CHILD =
+      list("parent", MAGIC);
+
+  private static final List<String> DEEP_MAGIC =
+      list("parent1", "parent2", MAGIC, "child1", "child2");
+
+  public static final String[] EMPTY = {};
+
+  @Test
+  public void testSplitPathEmpty() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> splitPathToElements(new Path("")));
+  }
+
+  @Test
+  public void testSplitPathDoubleBackslash() {
+    assertPathSplits("//", EMPTY);
+  }
+
+  @Test
+  public void testSplitRootPath() {
+    assertPathSplits("/", EMPTY);
+  }
+
+  @Test
+  public void testSplitBasic() {
+    assertPathSplits("/a/b/c",
+        new String[]{"a", "b", "c"});
+  }
+
+  @Test
+  public void testSplitTrailingSlash() {
+    assertPathSplits("/a/b/c/",
+        new String[]{"a", "b", "c"});
+  }
+
+  @Test
+  public void testSplitShortPath() {
+    assertPathSplits("/a",
+        new String[]{"a"});
+  }
+
+  @Test
+  public void testSplitShortPathTrailingSlash() {
+    assertPathSplits("/a/",
+        new String[]{"a"});
+  }
+
+  @Test
+  public void testParentsMagicRoot() {
+    assertParents(EMPTY, MAGIC_AT_ROOT);
+  }
+
+  @Test
+  public void testChildrenMagicRoot() {
+    assertChildren(EMPTY, MAGIC_AT_ROOT);
+  }
+
+  @Test
+  public void testParentsMagicRootWithChild() {
+    assertParents(EMPTY, MAGIC_AT_ROOT_WITH_CHILD);
+  }
+
+  @Test
+  public void testChildMagicRootWithChild() {
+    assertChildren(a("child"), MAGIC_AT_ROOT_WITH_CHILD);
+  }
+
+  @Test
+  public void testChildrenMagicWithoutChild() {
+    assertChildren(EMPTY, MAGIC_AT_WITHOUT_CHILD);
+  }
+
+  @Test
+  public void testChildMagicWithChild() {
+    assertChildren(a("child"), MAGIC_WITH_CHILD);
+  }
+
+  @Test
+  public void testParentMagicWithChild() {
+    assertParents(a("parent"), MAGIC_WITH_CHILD);
+  }
+
+  @Test
+  public void testParentDeepMagic() {
+    assertParents(a("parent1", "parent2"), DEEP_MAGIC);
+  }
+
+  @Test
+  public void testChildrenDeepMagic() {
+    assertChildren(a("child1", "child2"), DEEP_MAGIC);
+  }
+
+  @Test
+  public void testLastElementEmpty() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> lastElement(new ArrayList<>(0)));
+  }
+
+  @Test
+  public void testLastElementSingle() {
+    assertEquals("first", lastElement(l("first")));
+  }
+
+  @Test
+  public void testLastElementDouble() {
+    assertEquals("2", lastElement(l("first", "2")));
+  }
+
+  @Test
+  public void testFinalDestinationNoMagic() {
+    assertEquals(l("first", "2"),
+        finalDestination(l("first", "2")));
+  }
+
+  @Test
+  public void testFinalDestinationMagic1() {
+    assertEquals(l("first", "2"),
+        finalDestination(l("first", MAGIC, "2")));
+  }
+
+  @Test
+  public void testFinalDestinationMagic2() {
+    assertEquals(l("first", "3.txt"),
+        finalDestination(l("first", MAGIC, "2", "3.txt")));
+  }
+
+  @Test
+  public void testFinalDestinationRootMagic2() {
+    assertEquals(l("3.txt"),
+        finalDestination(l(MAGIC, "2", "3.txt")));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFinalDestinationMagicNoChild() {
+    finalDestination(l(MAGIC));
+  }
+
+  @Test
+  public void testFinalDestinationBaseDirectChild() {
+    finalDestination(l(MAGIC, BASE, "3.txt"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFinalDestinationBaseNoChild() {
+    assertEquals(l(), finalDestination(l(MAGIC, BASE)));
+  }
+
+  @Test
+  public void testFinalDestinationBaseSubdirsChild() {
+    assertEquals(l("2", "3.txt"),
+        finalDestination(l(MAGIC, "4", BASE, "2", "3.txt")));
+  }
+
+  /**
+   * If the base is above the magic dir, it's ignored.
+   */
+  @Test
+  public void testFinalDestinationIgnoresBaseBeforeMagic() {
+    assertEquals(l(BASE, "home", "3.txt"),
+        finalDestination(l(BASE, "home", MAGIC, "2", "3.txt")));
+  }
+
+  /** varargs to array. */
+  private static String[] a(String... str) {
+    return str;
+  }
+
+  /** list to array. */
+  private static List<String> l(String... str) {
+    return Arrays.asList(str);
+  }
+
+  /**
+   * Varags to list.
+   * @param args arguments
+   * @return a list
+   */
+  private static List<String> list(String... args) {
+    return Lists.newArrayList(args);
+  }
+
+  public void assertParents(String[] expected, List<String> elements) {
+    assertListEquals(expected, magicPathParents(elements));
+  }
+
+  public void assertChildren(String[] expected, List<String> elements) {
+    assertListEquals(expected, magicPathChildren(elements));
+  }
+
+  private void assertPathSplits(String pathString, String[] expected) {
+    Path path = new Path(pathString);
+    assertArrayEquals("From path " + path, expected,
+        splitPathToElements(path).toArray());
+  }
+
+  private void assertListEquals(String[] expected, List<String> actual) {
+    assertArrayEquals(expected, actual.toArray());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
new file mode 100644
index 0000000..5e6fb82
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test Tasks class.
+ */
+@RunWith(Parameterized.class)
+public class TestTasks extends HadoopTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(TestTasks.class);
+  public static final int ITEM_COUNT = 16;
+  private static final int FAILPOINT = 8;
+
+  private final int numThreads;
+  /**
+   * Thread pool for task execution.
+   */
+  private ExecutorService threadPool;
+  private final CounterTask failingTask
+      = new CounterTask("failing committer", FAILPOINT, Item::commit);
+
+  private final FailureCounter failures
+      = new FailureCounter("failures", 0, null);
+  private final CounterTask reverter
+      = new CounterTask("reverter", 0, Item::revert);
+  private final CounterTask aborter
+      = new CounterTask("aborter", 0, Item::abort);
+
+  /**
+   * Test array for parameterized test runs: how many threads and
+   * to use. Threading makes some of the assertions brittle; there are
+   * more checks on single thread than parallel ops.
+   * @return a list of parameter tuples.
+   */
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {0},
+        {1},
+        {3},
+        {8},
+        {16},
+    });
+  }
+
+
+  private List<Item> items;
+
+
+  /**
+   * Construct the parameterized test.
+   * @param numThreads number of threads
+   */
+  public TestTasks(int numThreads) {
+    this.numThreads = numThreads;
+  }
+
+  /**
+   * In a parallel test run there is more than one thread doing the execution.
+   * @return true if the threadpool size is >1
+   */
+  public boolean isParallel() {
+    return numThreads > 1;
+  }
+
+  @Before
+  public void setup() {
+    items = IntStream.rangeClosed(1, ITEM_COUNT)
+        .mapToObj(i -> new Item(i,
+            String.format("With %d threads", numThreads)))
+        .collect(Collectors.toList());
+
+    if (numThreads > 0) {
+      threadPool = Executors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat(getMethodName() + "-pool-%d")
+              .build());
+    }
+
+  }
+
+  @After
+  public void teardown() {
+    if (threadPool != null) {
+      threadPool.shutdown();
+      threadPool = null;
+    }
+  }
+
+  /**
+   * create the builder.
+   * @return pre-inited builder
+   */
+  private Tasks.Builder<Item> builder() {
+    return Tasks.foreach(items).executeWith(threadPool);
+  }
+
+  private void assertRun(Tasks.Builder<Item> builder,
+      CounterTask task) throws IOException {
+    boolean b = builder.run(task);
+    assertTrue("Run of " + task + " failed", b);
+  }
+
+  private void assertFailed(Tasks.Builder<Item> builder,
+      CounterTask task) throws IOException {
+    boolean b = builder.run(task);
+    assertFalse("Run of " + task + " unexpectedly succeeded", b);
+  }
+
+  private String itemsToString() {
+    return "[" + items.stream().map(Item::toString)
+        .collect(Collectors.joining("\n")) +"]";
+  }
+
+  @Test
+  public void testSimpleInvocation() throws Throwable {
+    CounterTask t = new CounterTask("simple", 0, Item::commit);
+    assertRun(builder(), t);
+    t.assertInvoked("", ITEM_COUNT);
+  }
+
+  @Test
+  public void testFailNoStoppingSuppressed() throws Throwable {
+    assertFailed(builder().suppressExceptions(), failingTask);
+    failingTask.assertInvoked("Continued through operations", ITEM_COUNT);
+    items.forEach(Item::assertCommittedOrFailed);
+  }
+
+  @Test
+  public void testFailFastSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .stopOnFailure(),
+        failingTask);
+    if (isParallel()) {
+      failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
+    } else {
+      failingTask.assertInvoked("stop fast", FAILPOINT);
+    }
+  }
+
+  @Test
+  public void testFailedCallAbortSuppressed() throws Throwable {
+    assertFailed(builder()
+            .stopOnFailure()
+            .suppressExceptions()
+            .abortWith(aborter),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    if (!isParallel()) {
+      aborter.assertInvokedAtLeast("abort", 1);
+      // all uncommitted items were aborted
+      items.stream().filter(i -> !i.committed)
+          .map(Item::assertAborted);
+      items.stream().filter(i -> i.committed)
+          .forEach(i -> assertFalse(i.toString(), i.aborted));
+    }
+  }
+
+  @Test
+  public void testFailedCalledWhenNotStoppingSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailFastCallRevertSuppressed() throws Throwable {
+    assertFailed(builder()
+            .stopOnFailure()
+            .revertWith(reverter)
+            .abortWith(aborter)
+            .suppressExceptions()
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    if (!isParallel()) {
+      aborter.assertInvokedAtLeast("abort", 1);
+      // all uncommitted items were aborted
+      items.stream().filter(i -> !i.committed)
+          .filter(i -> !i.failed)
+          .forEach(Item::assertAborted);
+    }
+    // all committed were reverted
+    items.stream().filter(i -> i.committed && !i.failed)
+        .forEach(Item::assertReverted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailSlowCallRevertSuppressed() throws Throwable {
+    assertFailed(builder()
+            .suppressExceptions()
+            .revertWith(reverter)
+            .onFailure(failures),
+        failingTask);
+    failingTask.assertInvokedAtLeast("success", FAILPOINT);
+    // all committed were reverted
+    // identify which task failed from the set
+    int failing = failures.getItem().id;
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> i.committed)
+        .forEach(Item::assertReverted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+  @Test
+  public void testFailFastExceptions() throws Throwable {
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .run(failingTask));
+    if (isParallel()) {
+      failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
+    } else {
+      failingTask.assertInvoked("stop fast", FAILPOINT);
+    }
+  }
+
+  @Test
+  public void testFailSlowExceptions() throws Throwable {
+    intercept(IOException.class,
+        () -> builder()
+            .run(failingTask));
+    failingTask.assertInvoked("continued through operations", ITEM_COUNT);
+    items.forEach(Item::assertCommittedOrFailed);
+  }
+
+  @Test
+  public void testFailFastExceptionsWithAbortFailure() throws Throwable {
+    CounterTask failFirst = new CounterTask("task", 1, Item::commit);
+    CounterTask a = new CounterTask("aborter", 1, Item::abort);
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .abortWith(a)
+            .run(failFirst));
+    if (!isParallel()) {
+      // expect the other tasks to be aborted
+      a.assertInvokedAtLeast("abort", ITEM_COUNT - 1);
+    }
+  }
+
+  @Test
+  public void testFailFastExceptionsWithAbortFailureStopped() throws Throwable 
{
+    CounterTask failFirst = new CounterTask("task", 1, Item::commit);
+    CounterTask a = new CounterTask("aborter", 1, Item::abort);
+    intercept(IOException.class,
+        () -> builder()
+            .stopOnFailure()
+            .stopAbortsOnFailure()
+            .abortWith(a)
+            .run(failFirst));
+    if (!isParallel()) {
+      // expect the other tasks to be aborted
+      a.assertInvoked("abort", 1);
+    }
+  }
+
+  /**
+   * Fail the last one committed, all the rest will be reverted.
+   * The actual ID of the last task has to be picke dup from the
+   * failure callback, as in the pool it may be one of any.
+   */
+  @Test
+  public void testRevertAllSuppressed() throws Throwable {
+    CounterTask failLast = new CounterTask("task", ITEM_COUNT, Item::commit);
+
+    assertFailed(builder()
+            .suppressExceptions()
+            .stopOnFailure()
+            .revertWith(reverter)
+            .abortWith(aborter)
+            .onFailure(failures),
+        failLast);
+    failLast.assertInvoked("success", ITEM_COUNT);
+    int abCount = aborter.getCount();
+    int revCount = reverter.getCount();
+    assertEquals(ITEM_COUNT, 1 + abCount + revCount);
+    // identify which task failed from the set
+    int failing = failures.getItem().id;
+    // all committed were reverted
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> i.committed)
+        .forEach(Item::assertReverted);
+    items.stream()
+        .filter(i -> i.id != failing)
+        .filter(i -> !i.committed)
+        .forEach(Item::assertAborted);
+    // all reverted items are committed
+    items.stream().filter(i -> i.reverted)
+        .forEach(Item::assertCommitted);
+
+    // only one failure was triggered
+    failures.assertInvoked("failure event", 1);
+  }
+
+
+  /**
+   * The Item which tasks process.
+   */
+  private final class Item {
+    private final int id;
+    private final String text;
+
+    private volatile boolean committed, aborted, reverted, failed;
+
+    private Item(int item, String text) {
+      this.id = item;
+      this.text = text;
+    }
+
+    boolean commit() {
+      committed = true;
+      return true;
+    }
+
+    boolean abort() {
+      aborted = true;
+      return true;
+    }
+
+    boolean revert() {
+      reverted = true;
+      return true;
+    }
+
+    boolean fail() {
+      failed = true;
+      return true;
+    }
+
+    public Item assertCommitted() {
+      assertTrue(toString() + " was not committed in\n"
+              +  itemsToString(),
+          committed);
+      return this;
+    }
+
+    public Item assertCommittedOrFailed() {
+      assertTrue(toString() + " was not committed nor failed in\n"
+              +  itemsToString(),
+          committed || failed);
+      return this;
+    }
+
+    public Item assertAborted() {
+      assertTrue(toString() + " was not aborted in\n"
+              + itemsToString(),
+          aborted);
+      return this;
+    }
+
+    public Item assertReverted() {
+      assertTrue(toString() + " was not reverted in\n"
+              + itemsToString(),
+          reverted);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("Item{");
+      sb.append(String.format("[%02d]", id));
+      sb.append(", committed=").append(committed);
+      sb.append(", aborted=").append(aborted);
+      sb.append(", reverted=").append(reverted);
+      sb.append(", failed=").append(failed);
+      sb.append(", text=").append(text);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Class which can count invocations and, if limit > 0, will raise
+   * an exception on the specific invocation of {@link #note(Object)}
+   * whose count == limit.
+   */
+  private class BaseCounter {
+    private final AtomicInteger counter = new AtomicInteger(0);
+    private final int limit;
+    private final String name;
+    private Item item;
+    private final Optional<Function<Item, Boolean>> action;
+
+    /**
+     * Base counter, tracks items.
+     * @param name name for string/exception/logs.
+     * @param limit limit at which an exception is raised, 0 == never
+     * @param action optional action to invoke after the increment,
+     * before limit check
+     */
+    BaseCounter(String name,
+        int limit,
+        Function<Item, Boolean> action) {
+      this.name = name;
+      this.limit = limit;
+      this.action = Optional.ofNullable(action);
+    }
+
+    /**
+     * Apply the action to an item; log at info afterwards with both the
+     * before and after string values of the item.
+     * @param i item to process.
+     * @throws IOException failure in the action
+     */
+    void process(Item i) throws IOException {
+      this.item = i;
+      int count = counter.incrementAndGet();
+      if (limit == count) {
+        i.fail();
+        LOG.info("{}: Failed {}", this, i);
+        throw new IOException(String.format("%s: Limit %d reached for %s",
+            this, limit, i));
+      }
+      String before = i.toString();
+      action.map(a -> a.apply(i));
+      LOG.info("{}: {} -> {}", this, before, i);
+    }
+
+    int getCount() {
+      return counter.get();
+    }
+
+    Item getItem() {
+      return item;
+    }
+
+    void assertInvoked(String text, int expected) {
+      assertEquals(toString() + ": " + text, expected, getCount());
+    }
+
+    void assertInvokedAtLeast(String text, int expected) {
+      int actual = getCount();
+      assertTrue(toString() + ": " + text
+              + "-expected " + expected
+              + " invocations, but got " + actual
+              + " in " + itemsToString(),
+          expected <= actual);
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "BaseCounter{");
+      sb.append("name='").append(name).append('\'');
+      sb.append(", count=").append(counter.get());
+      sb.append(", limit=").append(limit);
+      sb.append(", item=").append(item);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  private final class CounterTask
+      extends BaseCounter implements Tasks.Task<Item, IOException> {
+
+    private CounterTask(String name, int limit,
+        Function<Item, Boolean> action) {
+      super(name, limit, action);
+    }
+
+    @Override
+    public void run(Item item) throws IOException {
+      process(item);
+    }
+
+  }
+
+  private final class FailureCounter
+      extends BaseCounter implements Tasks.FailureTask<Item, IOException> {
+    private Exception exception;
+
+    private FailureCounter(String name, int limit,
+        Function<Item, Boolean> action) {
+      super(name, limit, action);
+    }
+
+    @Override
+    public void run(Item item, Exception ex) throws IOException {
+      process(item);
+      this.exception = ex;
+    }
+
+    private Exception getException() {
+      return exception;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
new file mode 100644
index 0000000..57eb8b2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITMagicCommitMRJob.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Full integration test for the Magic Committer.
+ *
+ * There's no need to disable the committer setting for the filesystem here,
+ * because the committers are being instantiated in their own processes;
+ * the settings in {@link #applyCustomConfigOptions(Configuration)} are
+ * passed down to these processes.
+ */
+public class ITMagicCommitMRJob extends AbstractITCommitMRJob {
+
+  /**
+   * Need consistency here.
+   * @return false
+   */
+  @Override
+  public boolean useInconsistentClient() {
+    return false;
+  }
+
+  @Override
+  protected String committerName() {
+    return MagicS3GuardCommitter.NAME;
+  }
+
+  /**
+   * Turn on the magic commit support for the FS, else nothing will work.
+   * @param conf configuration
+   */
+  @Override
+  protected void applyCustomConfigOptions(Configuration conf) {
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+  }
+
+  /**
+   * Check that the magic dir was cleaned up.
+   * {@inheritDoc}
+   */
+  @Override
+  protected void customPostExecutionValidation(Path destPath,
+      SuccessData successData) throws Exception {
+    assertPathDoesNotExist("No cleanup", new Path(destPath, MAGIC));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
new file mode 100644
index 0000000..74c1d9d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitUtils;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * Test the magic committer's commit protocol.
+ */
+public class ITestMagicCommitProtocol extends AbstractITCommitProtocol {
+
+  @Override
+  protected String suitename() {
+    return "ITestMagicCommitProtocol";
+  }
+
+  /**
+   * Need consistency here.
+   * @return false
+   */
+  @Override
+  public boolean useInconsistentClient() {
+    return false;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+    return conf;
+  }
+
+  @Override
+  protected String getCommitterFactoryName() {
+    return CommitConstants.S3A_COMMITTER_FACTORY;
+  }
+
+  @Override
+  protected String getCommitterName() {
+    return CommitConstants.COMMITTER_NAME_MAGIC;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    CommitUtils.verifyIsMagicCommitFS(getFileSystem());
+  }
+
+  @Override
+  public void assertJobAbortCleanedUp(JobData jobData)
+      throws Exception {
+    // special handling of magic directory; harmless in staging
+    Path magicDir = new Path(getOutDir(), MAGIC);
+    ContractTestUtils.assertPathDoesNotExist(getFileSystem(),
+        "magic dir ", magicDir);
+    super.assertJobAbortCleanedUp(jobData);
+  }
+
+  @Override
+  protected AbstractS3ACommitter createCommitter(
+      Path outputPath,
+      TaskAttemptContext context)
+      throws IOException {
+    return new MagicS3GuardCommitter(outputPath, context);
+  }
+
+  public AbstractS3ACommitter createFailingCommitter(
+      TaskAttemptContext tContext) throws IOException {
+    return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
+  }
+
+  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException 
{
+    String pathStr = p.toString();
+    assertTrue("not magic " + pathStr,
+        pathStr.contains(MAGIC));
+    assertPathDoesNotExist("task attempt visible", p);
+  }
+
+  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+    FileStatus st = getFileSystem().getFileStatus(p);
+    assertEquals("file length in " + st, 0, st.getLen());
+    Path pendingFile = new Path(p.toString() + PENDING_SUFFIX);
+    assertPathExists("pending file", pendingFile);
+  }
+
+  /**
+   * The class provides a overridden implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+
+  private static final class CommitterWithFailedThenSucceed extends
+      MagicS3GuardCommitter implements CommitterFaultInjection {
+    private final CommitterFaultInjectionImpl injection;
+
+    CommitterWithFailedThenSucceed(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      injection = new CommitterFaultInjectionImpl(outputPath, context, true);
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      injection.setupJob(context);
+      super.setupJob(context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      injection.abortJob(context, state);
+      super.abortJob(context, state);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public void cleanupJob(JobContext context) throws IOException {
+      injection.cleanupJob(context);
+      super.cleanupJob(context);
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+      injection.setupTask(context);
+      super.setupTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      injection.commitTask(context);
+      super.commitTask(context);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      injection.abortTask(context);
+      super.abortTask(context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      injection.commitJob(context);
+      super.commitJob(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context)
+        throws IOException {
+      injection.needsTaskCommit(context);
+      return super.needsTaskCommit(context);
+    }
+
+    @Override
+    public void setFaults(CommitterFaultInjection.Faults... faults) {
+      injection.setFaults(faults);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
new file mode 100644
index 0000000..e3a295b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitOperations;
+import org.apache.hadoop.fs.s3a.commit.CommitUtils;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+
+/**
+ * Write a huge file via the magic commit mechanism,
+ * commit it and verify that it is there. This is needed to
+ * verify that the pending-upload mechanism works with multipart files
+ * of more than one part.
+ *
+ * This is a scale test.
+ */
+public class ITestS3AHugeMagicCommits extends AbstractSTestS3AHugeFiles {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestS3AHugeMagicCommits.class);
+
+  private Path magicDir;
+  private Path jobDir;
+
+  /** file used as the destination for the write;
+   *  it is never actually created. */
+  private Path magicOutputFile;
+
+  /** The file with the JSON data about the commit. */
+  private Path pendingDataFile;
+
+  /**
+   * Use fast upload on disk.
+   * @return the upload buffer mechanism.
+   */
+  protected String getBlockOutputBufferName() {
+    return Constants.FAST_UPLOAD_BUFFER_DISK;
+  }
+
+  /**
+   * The suite name; required to be unique.
+   * @return the test suite name
+   */
+  @Override
+  public String getTestSuiteName() {
+    return "ITestS3AHugeMagicCommits";
+  }
+
+  /**
+   * Create the scale IO conf with the committer enabled.
+   * @return the configuration to use for the test FS.
+   */
+  @Override
+  protected Configuration createScaleConfiguration() {
+    Configuration conf = super.createScaleConfiguration();
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    CommitUtils.verifyIsMagicCommitFS(getFileSystem());
+
+    // set up the paths for the commit operation
+    Path finalDirectory = new Path(getScaleTestDir(), "commit");
+    magicDir = new Path(finalDirectory, MAGIC);
+    jobDir = new Path(magicDir, "job_001");
+    String filename = "commit.bin";
+    setHugefile(new Path(finalDirectory, filename));
+    magicOutputFile = new Path(jobDir, filename);
+    pendingDataFile = new Path(jobDir, filename + PENDING_SUFFIX);
+  }
+
+  /**
+   * Returns the path to the commit metadata file, not that of the huge file.
+   * @return a file in the job dir
+   */
+  @Override
+  protected Path getPathOfFileToCreate() {
+    return magicOutputFile;
+  }
+
+  @Override
+  public void test_030_postCreationAssertions() throws Throwable {
+    describe("Committing file");
+    assertPathDoesNotExist("final file exists", getHugefile());
+    assertPathExists("No pending file", pendingDataFile);
+    S3AFileSystem fs = getFileSystem();
+
+    // as a 0-byte marker is created, there is a file at the end path,
+    // it just MUST be 0-bytes long
+    FileStatus status = fs.getFileStatus(magicOutputFile);
+    assertEquals("Non empty marker file " + status,
+        0, status.getLen());
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    CommitOperations operations = new CommitOperations(fs);
+    Path destDir = getHugefile().getParent();
+    assertPathExists("Magic dir", new Path(destDir, CommitConstants.MAGIC));
+    String destDirKey = fs.pathToKey(destDir);
+    List<String> uploads = listMultipartUploads(fs, destDirKey);
+
+    assertEquals("Pending uploads: "
+        + uploads.stream()
+        .collect(Collectors.joining("\n")), 1, uploads.size());
+    assertNotNull("jobDir", jobDir);
+    Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
+        results = operations.loadSinglePendingCommits(jobDir, false);
+    for (SinglePendingCommit singlePendingCommit :
+        results.getKey().getCommits()) {
+      operations.commitOrFail(singlePendingCommit);
+    }
+    timer.end("time to commit %s", pendingDataFile);
+    // upload is no longer pending
+    uploads = listMultipartUploads(fs, destDirKey);
+    assertEquals("Pending uploads"
+            + uploads.stream().collect(Collectors.joining("\n")),
+        0, operations.listPendingUploadsUnderPath(destDir).size());
+    // at this point, the huge file exists, so the normal assertions
+    // on that file must be valid. Verify.
+    super.test_030_postCreationAssertions();
+  }
+
+  private void skipQuietly(String text) {
+    describe("Skipping: %s", text);
+  }
+
+  @Override
+  public void test_040_PositionedReadHugeFile() {
+    skipQuietly("test_040_PositionedReadHugeFile");
+  }
+
+  @Override
+  public void test_050_readHugeFile() {
+    skipQuietly("readHugeFile");
+  }
+
+  @Override
+  public void test_100_renameHugeFile() {
+    skipQuietly("renameHugeFile");
+  }
+
+  @Override
+  public void test_800_DeleteHugeFiles() throws IOException {
+    if (getFileSystem() != null) {
+      try {
+        getFileSystem().abortOutstandingMultipartUploads(0);
+      } catch (IOException e) {
+        LOG.info("Exception while purging old uploads", e);
+      }
+    }
+    try {
+      super.test_800_DeleteHugeFiles();
+    } finally {
+      ContractTestUtils.rm(getFileSystem(), magicDir, true, false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java
new file mode 100644
index 0000000..47383b7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/MockedStagingCommitter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.ClientErrors;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.ClientResults;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Committer subclass that uses a mocked S3A connection for testing.
+ */
+class MockedStagingCommitter extends StagingCommitter {
+
+  MockedStagingCommitter(Path outputPath,
+      TaskAttemptContext context)
+      throws IOException {
+    super(outputPath, context);
+  }
+
+  /**
+   * Returns the mock FS without checking FS type.
+   * @param out output path
+   * @param config job/task config
+   * @return a filesystem.
+   * @throws IOException IO failure
+   */
+  @Override
+  protected FileSystem getDestinationFS(Path out, Configuration config)
+      throws IOException {
+    return out.getFileSystem(config);
+  }
+
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    // turn off stamping an output marker, as that codepath isn't mocked yet.
+    super.commitJob(context);
+    Configuration conf = context.getConfiguration();
+    try {
+      String jobCommitterPath = conf.get("mock-results-file");
+      if (jobCommitterPath != null) {
+        try (ObjectOutputStream out = new ObjectOutputStream(
+            FileSystem.getLocal(conf)
+                .create(new Path(jobCommitterPath), false))) {
+          out.writeObject(getResults());
+        }
+      }
+    } catch (Exception e) {
+      // do nothing, the test will fail
+    }
+  }
+
+  @Override
+  protected void maybeCreateSuccessMarker(JobContext context,
+      List<String> filenames)
+      throws IOException {
+     //skipped
+  }
+
+  public ClientResults getResults() throws IOException {
+    MockS3AFileSystem mockFS = (MockS3AFileSystem)getDestS3AFS();
+    return mockFS.getOutcome().getKey();
+  }
+
+  public ClientErrors getErrors() throws IOException {
+    MockS3AFileSystem mockFS = (MockS3AFileSystem) getDestS3AFS();
+    return mockFS.getOutcome().getValue();
+  }
+
+  @Override
+  public String toString() {
+    return "MockedStagingCommitter{ " + super.toString() + " ";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java
new file mode 100644
index 0000000..5adce15
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedCommitterForTesting.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Partitioned committer overridden for better testing.
+ */
+class PartitionedCommitterForTesting extends
+    PartitionedStagingCommitter {
+
+  PartitionedCommitterForTesting(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+  }
+
+  @Override
+  protected void initOutput(Path out) throws IOException {
+    super.initOutput(out);
+    setOutputPath(out);
+  }
+
+  /**
+   * Returns the mock FS without checking FS type.
+   * @param out output path
+   * @param config job/task config
+   * @return a filesystem.
+   * @throws IOException failure to get the FS
+   */
+  @Override
+  protected FileSystem getDestinationFS(Path out, Configuration config)
+      throws IOException {
+    return out.getFileSystem(config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
new file mode 100644
index 0000000..38d5156
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/StagingTestBase.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractCommitITest;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.fs.s3a.commit.MiniDFSClusterService;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test base for mock tests of staging committers:
+ * core constants and static methods, inner classes
+ * for specific test types.
+ *
+ * Some of the verification methods here are unused...they are being left
+ * in place in case changes on the implementation make the verifications
+ * relevant again.
+ */
+public class StagingTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StagingTestBase.class);
+
+  public static final String BUCKET = MockS3AFileSystem.BUCKET;
+  public static final String OUTPUT_PREFIX = "output/path";
+  public static final Path OUTPUT_PATH =
+      new Path("s3a://" + BUCKET + "/" + OUTPUT_PREFIX);
+  public static final URI OUTPUT_PATH_URI = OUTPUT_PATH.toUri();
+
+  protected StagingTestBase() {
+  }
+
+  /**
+   * Sets up the mock filesystem instance and binds it to the
+   * {@link FileSystem#get(URI, Configuration)} call for the supplied URI
+   * and config.
+   * All standard mocking setup MUST go here.
+   * @param conf config to use
+   * @param outcome tuple of outcomes to store in mock FS
+   * @return the filesystem created
+   * @throws IOException IO problems.
+   */
+  protected static S3AFileSystem createAndBindMockFSInstance(Configuration 
conf,
+      Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> 
outcome)
+      throws IOException {
+    S3AFileSystem mockFs = mockS3AFileSystemRobustly();
+    MockS3AFileSystem wrapperFS = new MockS3AFileSystem(mockFs, outcome);
+    URI uri = OUTPUT_PATH_URI;
+    wrapperFS.initialize(uri, conf);
+    FileSystemTestHelper.addFileSystemForTesting(uri, conf, wrapperFS);
+    return mockFs;
+  }
+
+  private static S3AFileSystem mockS3AFileSystemRobustly() {
+    S3AFileSystem mockFS = mock(S3AFileSystem.class);
+    doNothing().when(mockFS).incrementReadOperations();
+    doNothing().when(mockFS).incrementWriteOperations();
+    doNothing().when(mockFS).incrementWriteOperations();
+    doNothing().when(mockFS).incrementWriteOperations();
+    return mockFS;
+  }
+
+  /**
+   * Look up the FS by URI, return a (cast) Mock wrapper.
+   * @param conf config
+   * @return the FS
+   * @throws IOException IO Failure
+   */
+  public static MockS3AFileSystem lookupWrapperFS(Configuration conf)
+      throws IOException {
+    return (MockS3AFileSystem) FileSystem.get(OUTPUT_PATH_URI, conf);
+  }
+
+  public static void verifyCompletion(FileSystem mockS3) throws IOException {
+    verifyCleanupTempFiles(mockS3);
+    verifyNoMoreInteractions(mockS3);
+  }
+
+  public static void verifyDeleted(FileSystem mockS3, Path path)
+      throws IOException {
+    verify(mockS3).delete(path, true);
+  }
+
+  public static void verifyDeleted(FileSystem mockS3, String child)
+      throws IOException {
+    verifyDeleted(mockS3, new Path(OUTPUT_PATH, child));
+  }
+
+  public static void verifyCleanupTempFiles(FileSystem mockS3)
+      throws IOException {
+    verifyDeleted(mockS3,
+        new Path(OUTPUT_PATH, CommitConstants.TEMPORARY));
+  }
+
+  protected static void assertConflictResolution(
+      StagingCommitter committer,
+      JobContext job,
+      ConflictResolution mode) {
+    Assert.assertEquals("Conflict resolution mode in " + committer,
+        mode, committer.getConflictResolutionMode(job, new Configuration()));
+  }
+
+  public static void pathsExist(FileSystem mockS3, String... children)
+      throws IOException {
+    for (String child : children) {
+      pathExists(mockS3, new Path(OUTPUT_PATH, child));
+    }
+  }
+
+  public static void pathExists(FileSystem mockS3, Path path)
+      throws IOException {
+    when(mockS3.exists(path)).thenReturn(true);
+  }
+
+  public static void pathDoesNotExist(FileSystem mockS3, Path path)
+      throws IOException {
+    when(mockS3.exists(path)).thenReturn(false);
+  }
+
+  public static void canDelete(FileSystem mockS3, String... children)
+      throws IOException {
+    for (String child : children) {
+      canDelete(mockS3, new Path(OUTPUT_PATH, child));
+    }
+  }
+
+  public static void canDelete(FileSystem mockS3, Path f) throws IOException {
+    when(mockS3.delete(f,
+        true /* recursive */))
+        .thenReturn(true);
+  }
+
+  public static void verifyExistenceChecked(FileSystem mockS3, String child)
+      throws IOException {
+    verifyExistenceChecked(mockS3, new Path(OUTPUT_PATH, child));
+  }
+
+  public static void verifyExistenceChecked(FileSystem mockS3, Path path)
+      throws IOException {
+    verify(mockS3).exists(path);
+  }
+
+  /**
+   * Provides setup/teardown of a MiniDFSCluster for tests that need one.
+   */
+  public static class MiniDFSTest extends HadoopTestBase {
+
+    private static MiniDFSClusterService hdfs;
+
+    private static JobConf conf = null;
+
+    protected static JobConf getConfiguration() {
+      return conf;
+    }
+
+    protected static FileSystem getDFS() {
+      return hdfs.getClusterFS();
+    }
+
+    /**
+     * Setup the mini HDFS cluster.
+     * @throws IOException Failure
+     */
+    @BeforeClass
+    @SuppressWarnings("deprecation")
+    public static void setupHDFS() throws IOException {
+      if (hdfs == null) {
+        JobConf c = new JobConf();
+        hdfs = new MiniDFSClusterService();
+        hdfs.init(c);
+        hdfs.start();
+        conf = c;
+      }
+    }
+
+    @SuppressWarnings("ThrowableNotThrown")
+    @AfterClass
+    public static void teardownFS() throws IOException {
+      ServiceOperations.stopQuietly(hdfs);
+      conf = null;
+      hdfs = null;
+    }
+
+  }
+
+  /**
+   * Base class for job committer tests.
+   * @param <C> committer
+   */
+  public abstract static class JobCommitterTest<C extends OutputCommitter>
+      extends HadoopTestBase {
+    private static final JobID JOB_ID = new JobID("job", 1);
+    private JobConf jobConf;
+
+    // created in BeforeClass
+    private S3AFileSystem mockFS = null;
+    private MockS3AFileSystem wrapperFS = null;
+    private JobContext job = null;
+
+    // created in Before
+    private StagingTestBase.ClientResults results = null;
+    private StagingTestBase.ClientErrors errors = null;
+    private AmazonS3 mockClient = null;
+
+    @Before
+    public void setupJob() throws Exception {
+      this.jobConf = new JobConf();
+      jobConf.set(InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
+          UUID.randomUUID().toString());
+      jobConf.setBoolean(
+          CommitConstants.CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+          false);
+
+      this.job = new JobContextImpl(jobConf, JOB_ID);
+      this.results = new StagingTestBase.ClientResults();
+      this.errors = new StagingTestBase.ClientErrors();
+      this.mockClient = newMockS3Client(results, errors);
+      this.mockFS = createAndBindMockFSInstance(jobConf,
+          Pair.of(results, errors));
+      this.wrapperFS = lookupWrapperFS(jobConf);
+      // and bind the FS
+      wrapperFS.setAmazonS3Client(mockClient);
+    }
+
+    public S3AFileSystem getMockS3A() {
+      return mockFS;
+    }
+
+    public MockS3AFileSystem getWrapperFS() {
+      return wrapperFS;
+    }
+
+    public JobContext getJob() {
+      return job;
+    }
+
+    /**
+     * Create a task attempt for a job by creating a stub task ID.
+     * @return a task attempt
+     */
+    public TaskAttemptContext createTaskAttemptForJob() {
+      return AbstractCommitITest.taskAttemptForJob(
+          MRBuilderUtils.newJobId(1, JOB_ID.getId(), 1), job);
+    }
+
+    protected StagingTestBase.ClientResults getMockResults() {
+      return results;
+    }
+
+    protected StagingTestBase.ClientErrors getMockErrors() {
+      return errors;
+    }
+
+    abstract C newJobCommitter() throws Exception;
+  }
+
+  /** Abstract test of task commits. */
+  public abstract static class TaskCommitterTest<C extends OutputCommitter>
+      extends JobCommitterTest<C> {
+    private static final TaskAttemptID AID = new TaskAttemptID(
+        new TaskID(JobCommitterTest.JOB_ID, TaskType.REDUCE, 2), 3);
+
+    private C jobCommitter = null;
+    private TaskAttemptContext tac = null;
+    private File tempDir;
+
+    @Before
+    public void setupTask() throws Exception {
+      this.jobCommitter = newJobCommitter();
+      jobCommitter.setupJob(getJob());
+
+      this.tac = new TaskAttemptContextImpl(
+          new Configuration(getJob().getConfiguration()), AID);
+
+      // get the task's configuration copy so modifications take effect
+      String tmp = System.getProperty(
+          StagingCommitterConstants.JAVA_IO_TMPDIR);
+      tempDir = new File(tmp);
+      tac.getConfiguration().set(Constants.BUFFER_DIR, tmp + "/buffer");
+      tac.getConfiguration().set(
+          CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
+          tmp + "/cluster");
+    }
+
+    protected C getJobCommitter() {
+      return jobCommitter;
+    }
+
+    protected TaskAttemptContext getTAC() {
+      return tac;
+    }
+
+    abstract C newTaskCommitter() throws Exception;
+
+    protected File getTempDir() {
+      return tempDir;
+    }
+  }
+
+  /**
+   * Results accrued during mock runs.
+   * This data is serialized in MR Tests and read back in in the test runner
+   */
+  public static class ClientResults implements Serializable {
+    private static final long serialVersionUID = -3137637327090709905L;
+    // For inspection of what the committer did
+    private final Map<String, InitiateMultipartUploadRequest> requests =
+        Maps.newHashMap();
+    private final List<String> uploads = Lists.newArrayList();
+    private final List<UploadPartRequest> parts = Lists.newArrayList();
+    private final Map<String, List<String>> tagsByUpload = Maps.newHashMap();
+    private final List<CompleteMultipartUploadRequest> commits =
+        Lists.newArrayList();
+    private final List<AbortMultipartUploadRequest> aborts
+        = Lists.newArrayList();
+    private final Map<String, String> activeUploads =
+        Maps.newHashMap();
+    private final List<DeleteObjectRequest> deletes = Lists.newArrayList();
+
+    public Map<String, InitiateMultipartUploadRequest> getRequests() {
+      return requests;
+    }
+
+    public List<String> getUploads() {
+      return uploads;
+    }
+
+    public List<UploadPartRequest> getParts() {
+      return parts;
+    }
+
+    public Map<String, List<String>> getTagsByUpload() {
+      return tagsByUpload;
+    }
+
+    public List<CompleteMultipartUploadRequest> getCommits() {
+      return commits;
+    }
+
+    public List<AbortMultipartUploadRequest> getAborts() {
+      return aborts;
+    }
+
+    public List<DeleteObjectRequest> getDeletes() {
+      return deletes;
+    }
+
+    public void resetDeletes() {
+      deletes.clear();
+    }
+
+    public void resetUploads() {
+      uploads.clear();
+      activeUploads.clear();
+    }
+
+    public void resetCommits() {
+      commits.clear();
+    }
+
+    public void resetRequests() {
+      requests.clear();
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          super.toString());
+      sb.append("{ requests=").append(requests.size());
+      sb.append(", uploads=").append(uploads.size());
+      sb.append(", parts=").append(parts.size());
+      sb.append(", tagsByUpload=").append(tagsByUpload.size());
+      sb.append(", commits=").append(commits.size());
+      sb.append(", aborts=").append(aborts.size());
+      sb.append(", deletes=").append(deletes.size());
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /** Control errors to raise in mock S3 client. */
+  public static class ClientErrors {
+    // For injecting errors
+    private int failOnInit = -1;
+    private int failOnUpload = -1;
+    private int failOnCommit = -1;
+    private int failOnAbort = -1;
+    private boolean recover = false;
+
+    public void failOnInit(int initNum) {
+      this.failOnInit = initNum;
+    }
+
+    public void failOnUpload(int uploadNum) {
+      this.failOnUpload = uploadNum;
+    }
+
+    public void failOnCommit(int commitNum) {
+      this.failOnCommit = commitNum;
+    }
+
+    public void failOnAbort(int abortNum) {
+      this.failOnAbort = abortNum;
+    }
+
+    public void recoverAfterFailure() {
+      this.recover = true;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "ClientErrors{");
+      sb.append("failOnInit=").append(failOnInit);
+      sb.append(", failOnUpload=").append(failOnUpload);
+      sb.append(", failOnCommit=").append(failOnCommit);
+      sb.append(", failOnAbort=").append(failOnAbort);
+      sb.append(", recover=").append(recover);
+      sb.append('}');
+      return sb.toString();
+    }
+
+    public int getFailOnInit() {
+      return failOnInit;
+    }
+
+    public int getFailOnUpload() {
+      return failOnUpload;
+    }
+
+    public int getFailOnCommit() {
+      return failOnCommit;
+    }
+
+    public int getFailOnAbort() {
+      return failOnAbort;
+    }
+
+    public boolean isRecover() {
+      return recover;
+    }
+  }
+
+  /**
+   * Instantiate mock client with the results and errors requested.
+   * @param results results to accrue
+   * @param errors when (if any) to fail
+   * @return the mock client to patch in to a committer/FS instance
+   */
+  public static AmazonS3 newMockS3Client(final ClientResults results,
+      final ClientErrors errors) {
+    AmazonS3Client mockClient = mock(AmazonS3Client.class);
+    final Object lock = new Object();
+
+    // initiateMultipartUpload
+    when(mockClient
+        .initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
+        .thenAnswer(invocation -> {
+          LOG.debug("initiateMultipartUpload for {}", mockClient);
+          synchronized (lock) {
+            if (results.requests.size() == errors.failOnInit) {
+              if (errors.recover) {
+                errors.failOnInit(-1);
+              }
+              throw new AmazonClientException(
+                  "Mock Fail on init " + results.requests.size());
+            }
+            String uploadId = UUID.randomUUID().toString();
+            InitiateMultipartUploadRequest req = invocation.getArgumentAt(
+                0, InitiateMultipartUploadRequest.class);
+            results.requests.put(uploadId, req);
+            results.activeUploads.put(uploadId, req.getKey());
+            results.uploads.add(uploadId);
+            return newResult(results.requests.get(uploadId), uploadId);
+          }
+        });
+
+    // uploadPart
+    when(mockClient.uploadPart(any(UploadPartRequest.class)))
+        .thenAnswer(invocation -> {
+          LOG.debug("uploadPart for {}", mockClient);
+          synchronized (lock) {
+            if (results.parts.size() == errors.failOnUpload) {
+              if (errors.recover) {
+                errors.failOnUpload(-1);
+              }
+              LOG.info("Triggering upload failure");
+              throw new AmazonClientException(
+                  "Mock Fail on upload " + results.parts.size());
+            }
+            UploadPartRequest req = invocation.getArgumentAt(
+                0, UploadPartRequest.class);
+            results.parts.add(req);
+            String etag = UUID.randomUUID().toString();
+            List<String> etags = results.tagsByUpload.get(req.getUploadId());
+            if (etags == null) {
+              etags = Lists.newArrayList();
+              results.tagsByUpload.put(req.getUploadId(), etags);
+            }
+            etags.add(etag);
+            return newResult(req, etag);
+          }
+        });
+
+    // completeMultipartUpload
+    when(mockClient
+        .completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
+        .thenAnswer(invocation -> {
+          LOG.debug("completeMultipartUpload for {}", mockClient);
+          synchronized (lock) {
+            if (results.commits.size() == errors.failOnCommit) {
+              if (errors.recover) {
+                errors.failOnCommit(-1);
+              }
+              throw new AmazonClientException(
+                  "Mock Fail on commit " + results.commits.size());
+            }
+            CompleteMultipartUploadRequest req = invocation.getArgumentAt(
+                0, CompleteMultipartUploadRequest.class);
+            results.commits.add(req);
+            results.activeUploads.remove(req.getUploadId());
+
+            return newResult(req);
+          }
+        });
+
+    // abortMultipartUpload mocking
+    doAnswer(invocation -> {
+      LOG.debug("abortMultipartUpload for {}", mockClient);
+      synchronized (lock) {
+        if (results.aborts.size() == errors.failOnAbort) {
+          if (errors.recover) {
+            errors.failOnAbort(-1);
+          }
+          throw new AmazonClientException(
+              "Mock Fail on abort " + results.aborts.size());
+        }
+        AbortMultipartUploadRequest req = invocation.getArgumentAt(
+            0, AbortMultipartUploadRequest.class);
+        String id = req.getUploadId();
+        String p = results.activeUploads.remove(id);
+        if (p == null) {
+          // upload doesn't exist
+          AmazonS3Exception ex = new AmazonS3Exception(
+              "not found " + id);
+          ex.setStatusCode(404);
+          throw ex;
+        }
+        results.aborts.add(req);
+        return null;
+      }
+    })
+        .when(mockClient)
+        .abortMultipartUpload(any(AbortMultipartUploadRequest.class));
+
+    // deleteObject mocking
+    doAnswer(invocation -> {
+      LOG.debug("deleteObject for {}", mockClient);
+      synchronized (lock) {
+        results.deletes.add(invocation.getArgumentAt(
+            0, DeleteObjectRequest.class));
+        return null;
+      }
+    })
+        .when(mockClient)
+        .deleteObject(any(DeleteObjectRequest.class));
+
+    // deleteObject mocking
+    doAnswer(invocation -> {
+      LOG.debug("deleteObject for {}", mockClient);
+      synchronized (lock) {
+        results.deletes.add(new DeleteObjectRequest(
+            invocation.getArgumentAt(0, String.class),
+            invocation.getArgumentAt(1, String.class)
+        ));
+        return null;
+      }
+    }).when(mockClient)
+        .deleteObject(any(String.class), any(String.class));
+
+    // to String returns the debug information
+    when(mockClient.toString()).thenAnswer(
+        invocation -> "Mock3AClient " + results + " " + errors);
+
+    when(mockClient
+        .listMultipartUploads(any(ListMultipartUploadsRequest.class)))
+        .thenAnswer(invocation -> {
+          synchronized (lock) {
+            MultipartUploadListing l = new MultipartUploadListing();
+            l.setMultipartUploads(
+                results.activeUploads.entrySet().stream()
+                    .map(e -> newMPU(e.getKey(), e.getValue()))
+                    .collect(Collectors.toList()));
+            return l;
+          }
+        });
+
+    return mockClient;
+  }
+
+  private static CompleteMultipartUploadResult newResult(
+      CompleteMultipartUploadRequest req) {
+    return new CompleteMultipartUploadResult();
+  }
+
+
+  private static MultipartUpload newMPU(String id, String path) {
+    MultipartUpload up = new MultipartUpload();
+    up.setUploadId(id);
+    up.setKey(path);
+    return up;
+  }
+
+  private static UploadPartResult newResult(UploadPartRequest request,
+      String etag) {
+    UploadPartResult result = new UploadPartResult();
+    result.setPartNumber(request.getPartNumber());
+    result.setETag(etag);
+    return result;
+  }
+
+  private static InitiateMultipartUploadResult newResult(
+      InitiateMultipartUploadRequest request, String uploadId) {
+    InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+    result.setUploadId(uploadId);
+    return result;
+  }
+
+  /**
+   * create files in the attempt path that should be found by
+   * {@code getTaskOutput}.
+   * @param relativeFiles list of files relative to address path
+   * @param attemptPath attempt path
+   * @param conf config for FS
+   * @throws IOException on any failure
+   */
+  public static void createTestOutputFiles(List<String> relativeFiles,
+      Path attemptPath,
+      Configuration conf) throws IOException {
+    //
+    FileSystem attemptFS = attemptPath.getFileSystem(conf);
+    attemptFS.delete(attemptPath, true);
+    for (String relative : relativeFiles) {
+      // 0-length files are ignored, so write at least one byte
+      OutputStream out = attemptFS.create(new Path(attemptPath, relative));
+      out.write(34);
+      out.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java
new file mode 100644
index 0000000..ee6480a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestPaths.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test {@link org.apache.hadoop.fs.s3a.commit.staging.Paths}.
+ */
+public class TestPaths extends HadoopTestBase {
+
+  @Test
+  public void testUUIDPart() {
+    assertUUIDAdded("/part-0000", "/part-0000-UUID");
+  }
+
+  @Test
+  public void testUUIDPartSuffix() {
+    assertUUIDAdded("/part-0000.gz.csv", "/part-0000-UUID.gz.csv");
+  }
+
+  @Test
+  public void testUUIDDottedPath() {
+    assertUUIDAdded("/parent.dir/part-0000", "/parent.dir/part-0000-UUID");
+  }
+
+  @Test
+  public void testUUIDPartUUID() {
+    assertUUIDAdded("/part-0000-UUID.gz.csv", "/part-0000-UUID.gz.csv");
+  }
+
+  @Test
+  public void testUUIDParentUUID() {
+    assertUUIDAdded("/UUID/part-0000.gz.csv", "/UUID/part-0000.gz.csv");
+  }
+
+  @Test
+  public void testUUIDDir() throws Throwable {
+    intercept(IllegalStateException.class,
+        () -> addUUID("/dest/", "UUID"));
+  }
+
+  @Test
+  public void testUUIDEmptyDir() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> addUUID("", "UUID"));
+  }
+
+  @Test
+  public void testEmptyUUID() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> addUUID("part-0000.gz", ""));
+  }
+
+  private void assertUUIDAdded(String path, String expected) {
+    assertEquals("from " + path, expected, addUUID(path, "UUID"));
+  }
+
+  private static final String DATA = "s3a://landsat-pds/data/";
+  private static final Path BASE = new Path(DATA);
+
+  @Test
+  public void testRelativizeOneLevel() {
+    String suffix = "year=2017";
+    Path path = new Path(DATA + suffix);
+    assertEquals(suffix, getRelativePath(BASE, path));
+  }
+
+  @Test
+  public void testRelativizeTwoLevel() {
+    String suffix = "year=2017/month=10";
+    Path path = path(BASE, suffix);
+    assertEquals(suffix, getRelativePath(BASE, path));
+  }
+
+  @Test
+  public void testRelativizeSelf() {
+    assertEquals("", getRelativePath(BASE, BASE));
+  }
+
+  @Test
+  public void testRelativizeParent() {
+    // goes up to the parent if one is above the other
+    assertEquals("/", getRelativePath(BASE, BASE.getParent()));
+  }
+
+  @Test
+  public void testGetPartition() {
+    assertEquals("year=2017/month=10",
+        getPartition("year=2017/month=10/part-0000.avro"));
+  }
+
+  @Test
+  public void testMPUCommitDir() throws Throwable {
+    Configuration conf = new Configuration();
+    LocalFileSystem localFS = FileSystem.getLocal(conf);
+    Path dir = getMultipartUploadCommitsDirectory(localFS, conf, "UUID");
+    assertTrue(dir.toString().endsWith("UUID/"
+        + StagingCommitterConstants.STAGING_UPLOADS));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to