http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
new file mode 100644
index 0000000..b6b6b97
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for parallel execution, takes closures for the various
+ * actions.
+ * There is no retry logic: it is expected to be handled by the closures.
+ */
+public final class Tasks {
+  private static final Logger LOG = LoggerFactory.getLogger(Tasks.class);
+
+  private Tasks() {
+  }
+
+  /**
+   * Callback invoked to process an item.
+   * @param <I> item type being processed
+   * @param <E> exception class which may be raised
+   */
+  @FunctionalInterface
+  public interface Task<I, E extends Exception> {
+    void run(I item) throws E;
+  }
+
+  /**
+   * Callback invoked on a failure.
+   * @param <I> item type being processed
+   * @param <E> exception class which may be raised
+   */
+  @FunctionalInterface
+  public interface FailureTask<I, E extends Exception> {
+
+    /**
+     * process a failure.
+     * @param item item the task is processing
+     * @param exception the exception which was raised.
+     * @throws E Exception of type E
+     */
+    void run(I item, Exception exception) throws E;
+  }
+
+  /**
+   * Builder for task execution.
+   * @param <I> item type
+   */
+  public static class Builder<I> {
+    private final Iterable<I> items;
+    private ExecutorService service = null;
+    private FailureTask<I, ?> onFailure = null;
+    private boolean stopOnFailure = false;
+    private boolean suppressExceptions = false;
+    private Task<I, ?> revertTask = null;
+    private boolean stopRevertsOnFailure = false;
+    private Task<I, ?> abortTask = null;
+    private boolean stopAbortsOnFailure = false;
+
+    /**
+     * Create the builder.
+     * @param items items to process
+     */
+    Builder(Iterable<I> items) {
+      this.items = items;
+    }
+
+    /**
+     * Declare executor service: if null, the tasks are executed in a single
+     * thread.
+     * @param executorService service to schedule tasks with.
+     * @return this builder.
+     */
+    public Builder<I> executeWith(ExecutorService executorService) {
+      this.service = executorService;
+      return this;
+    }
+
+    public Builder<I> onFailure(FailureTask<I, ?> task) {
+      this.onFailure = task;
+      return this;
+    }
+
+    public Builder<I> stopOnFailure() {
+      this.stopOnFailure = true;
+      return this;
+    }
+
+    public Builder<I> suppressExceptions() {
+      return suppressExceptions(true);
+    }
+
+    public Builder<I> suppressExceptions(boolean suppress) {
+      this.suppressExceptions = suppress;
+      return this;
+    }
+
+    public Builder<I> revertWith(Task<I, ?> task) {
+      this.revertTask = task;
+      return this;
+    }
+
+    public Builder<I> stopRevertsOnFailure() {
+      this.stopRevertsOnFailure = true;
+      return this;
+    }
+
+    public Builder<I> abortWith(Task<I, ?> task) {
+      this.abortTask = task;
+      return this;
+    }
+
+    public Builder<I> stopAbortsOnFailure() {
+      this.stopAbortsOnFailure = true;
+      return this;
+    }
+
+    public <E extends Exception> boolean run(Task<I, E> task) throws E {
+      if (service != null) {
+        return runParallel(task);
+      } else {
+        return runSingleThreaded(task);
+      }
+    }
+
+    private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
+        throws E {
+      List<I> succeeded = new ArrayList<>();
+      List<Exception> exceptions = new ArrayList<>();
+
+      Iterator<I> iterator = items.iterator();
+      boolean threw = true;
+      try {
+        while (iterator.hasNext()) {
+          I item = iterator.next();
+          try {
+            task.run(item);
+            succeeded.add(item);
+
+          } catch (Exception e) {
+            exceptions.add(e);
+
+            if (onFailure != null) {
+              try {
+                onFailure.run(item, e);
+              } catch (Exception failException) {
+                LOG.error("Failed to clean up on failure", e);
+                // keep going
+              }
+            }
+
+            if (stopOnFailure) {
+              break;
+            }
+          }
+        }
+
+        threw = false;
+
+      } finally {
+        // threw handles exceptions that were *not* caught by the catch block,
+        // and exceptions that were caught and possibly handled by onFailure
+        // are kept in exceptions.
+        if (threw || !exceptions.isEmpty()) {
+          if (revertTask != null) {
+            boolean failed = false;
+            for (I item : succeeded) {
+              try {
+                revertTask.run(item);
+              } catch (Exception e) {
+                LOG.error("Failed to revert task", e);
+                failed = true;
+                // keep going
+              }
+              if (stopRevertsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+
+          if (abortTask != null) {
+            boolean failed = false;
+            while (iterator.hasNext()) {
+              try {
+                abortTask.run(iterator.next());
+              } catch (Exception e) {
+                failed = true;
+                LOG.error("Failed to abort task", e);
+                // keep going
+              }
+              if (stopAbortsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+        }
+      }
+
+      if (!suppressExceptions && !exceptions.isEmpty()) {
+        Tasks.<E>throwOne(exceptions);
+      }
+
+      return !threw && exceptions.isEmpty();
+    }
+
+    private <E extends Exception> boolean runParallel(final Task<I, E> task)
+        throws E {
+      final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
+      final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
+      final AtomicBoolean taskFailed = new AtomicBoolean(false);
+      final AtomicBoolean abortFailed = new AtomicBoolean(false);
+      final AtomicBoolean revertFailed = new AtomicBoolean(false);
+
+      List<Future<?>> futures = new ArrayList<>();
+
+      for (final I item : items) {
+        // submit a task for each item that will either run or abort the task
+        futures.add(service.submit(new Runnable() {
+          @Override
+          public void run() {
+            if (!(stopOnFailure && taskFailed.get())) {
+              // run the task
+              boolean threw = true;
+              try {
+                LOG.debug("Executing task");
+                task.run(item);
+                succeeded.add(item);
+                LOG.debug("Task succeeded");
+
+                threw = false;
+
+              } catch (Exception e) {
+                taskFailed.set(true);
+                exceptions.add(e);
+                LOG.info("Task failed", e);
+
+                if (onFailure != null) {
+                  try {
+                    onFailure.run(item, e);
+                  } catch (Exception failException) {
+                    LOG.error("Failed to clean up on failure", e);
+                    // swallow the exception
+                  }
+                }
+              } finally {
+                if (threw) {
+                  taskFailed.set(true);
+                }
+              }
+
+            } else if (abortTask != null) {
+              // abort the task instead of running it
+              if (stopAbortsOnFailure && abortFailed.get()) {
+                return;
+              }
+
+              boolean failed = true;
+              try {
+                LOG.info("Aborting task");
+                abortTask.run(item);
+                failed = false;
+              } catch (Exception e) {
+                LOG.error("Failed to abort task", e);
+                // swallow the exception
+              } finally {
+                if (failed) {
+                  abortFailed.set(true);
+                }
+              }
+            }
+          }
+        }));
+      }
+
+      // let the above tasks complete (or abort)
+      waitFor(futures);
+      int futureCount = futures.size();
+      futures.clear();
+
+      if (taskFailed.get() && revertTask != null) {
+        // at least one task failed, revert any that succeeded
+        LOG.info("Reverting all {} succeeded tasks from {} futures",
+            succeeded.size(), futureCount);
+        for (final I item : succeeded) {
+          futures.add(service.submit(() -> {
+            if (stopRevertsOnFailure && revertFailed.get()) {
+              return;
+            }
+
+            boolean failed = true;
+            try {
+              revertTask.run(item);
+              failed = false;
+            } catch (Exception e) {
+              LOG.error("Failed to revert task", e);
+              // swallow the exception
+            } finally {
+              if (failed) {
+                revertFailed.set(true);
+              }
+            }
+          }));
+        }
+
+        // let the revert tasks complete
+        waitFor(futures);
+      }
+
+      if (!suppressExceptions && !exceptions.isEmpty()) {
+        Tasks.<E>throwOne(exceptions);
+      }
+
+      return !taskFailed.get();
+    }
+  }
+
+  /**
+   * Wait for all the futures to complete; there's a small sleep between
+   * each iteration; enough to yield the CPU.
+   * @param futures futures.
+   */
+  private static void waitFor(Collection<Future<?>> futures) {
+    int size = futures.size();
+    LOG.debug("Waiting for {} tasks to complete", size);
+    int oldNumFinished = 0;
+    while (true) {
+      int numFinished = (int) futures.stream().filter(Future::isDone).count();
+
+      if (oldNumFinished != numFinished) {
+        LOG.debug("Finished count -> {}/{}", numFinished, size);
+        oldNumFinished = numFinished;
+      }
+
+      if (numFinished == size) {
+        // all of the futures are done, stop looping
+        break;
+      } else {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          futures.forEach(future -> future.cancel(true));
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+
+  public static <I> Builder<I> foreach(Iterable<I> items) {
+    return new Builder<>(items);
+  }
+
+  public static <I> Builder<I> foreach(I[] items) {
+    return new Builder<>(Arrays.asList(items));
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <E extends Exception> void throwOne(
+      Collection<Exception> exceptions)
+      throws E {
+    Iterator<Exception> iter = exceptions.iterator();
+    Exception e = iter.next();
+    Class<? extends Exception> exceptionClass = e.getClass();
+
+    while (iter.hasNext()) {
+      Exception other = iter.next();
+      if (!exceptionClass.isInstance(other)) {
+        e.addSuppressed(other);
+      }
+    }
+
+    Tasks.<E>castAndThrow(e);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <E extends Exception> void castAndThrow(Exception e) throws E 
{
+    if (e instanceof RuntimeException) {
+      throw (RuntimeException) e;
+    }
+    throw (E) e;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java
new file mode 100644
index 0000000..ac29932
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/ValidationFailure.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+/**
+ * Exception raised on validation failures; kept as an IOException
+ * for consistency with other failures.
+ */
+public class ValidationFailure extends IOException {
+
+  /**
+   * Create an instance with string formatting applied to the message
+   * and arguments.
+   * @param message error message
+   * @param args optional list of arguments
+   */
+  public ValidationFailure(String message, Object... args) {
+    super(String.format(message, args));
+  }
+
+  /**
+   * Verify that a condition holds.
+   * @param expression expression which must be true
+   * @param message message to raise on a failure
+   * @param args arguments for the message formatting
+   * @throws ValidationFailure on a failure
+   */
+  public static void verify(boolean expression,
+      String message,
+      Object... args) throws ValidationFailure {
+    if (!expression) {
+      throw new ValidationFailure(message, args);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
new file mode 100644
index 0000000..c0d7415
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PendingSet.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static 
org.apache.hadoop.fs.s3a.commit.CommitUtils.validateCollectionClass;
+import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify;
+
+/**
+ * Persistent format for multiple pending commits.
+ * Contains 0 or more {@link SinglePendingCommit} entries; validation logic
+ * checks those values on load.
+ */
+@SuppressWarnings("unused")
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PendingSet extends PersistentCommitData {
+  private static final Logger LOG = LoggerFactory.getLogger(PendingSet.class);
+
+  /**
+   * Supported version value: {@value}.
+   * If this is changed the value of {@link #serialVersionUID} will change,
+   * to avoid deserialization problems.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * Serialization ID: {@value}.
+   */
+  private static final long serialVersionUID = 0x11000 + VERSION;
+
+
+  /** Version marker. */
+  private int version = VERSION;
+
+  /**
+   * Commit list.
+   */
+  private List<SinglePendingCommit> commits;
+
+  /**
+   * Any custom extra data committer subclasses may choose to add.
+   */
+  private final Map<String, String> extraData = new HashMap<>(0);
+
+  public PendingSet() {
+    this(0);
+  }
+
+
+  public PendingSet(int size) {
+    commits = new ArrayList<>(size);
+  }
+
+  /**
+   * Get a JSON serializer for this class.
+   * @return a serializer.
+   */
+  public static JsonSerialization<PendingSet> serializer() {
+    return new JsonSerialization<>(PendingSet.class, false, true);
+  }
+
+  /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param path path
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static PendingSet load(FileSystem fs, Path path)
+      throws IOException {
+    LOG.debug("Reading pending commits in file {}", path);
+    PendingSet instance = serializer().load(fs, path);
+    instance.validate();
+    return instance;
+  }
+
+  /**
+   * Add a commit.
+   * @param commit the single commit
+   */
+  public void add(SinglePendingCommit commit) {
+    commits.add(commit);
+  }
+
+  /**
+   * Deserialize via java Serialization API: deserialize the instance
+   * and then call {@link #validate()} to verify that the deserialized
+   * data is valid.
+   * @param inStream input stream
+   * @throws IOException IO problem or validation failure
+   * @throws ClassNotFoundException reflection problems
+   */
+  private void readObject(ObjectInputStream inStream) throws IOException,
+      ClassNotFoundException {
+    inStream.defaultReadObject();
+    validate();
+  }
+
+  /**
+   * Validate the data: those fields which must be non empty, must be set.
+   * @throws ValidationFailure if the data is invalid
+   */
+  public void validate() throws ValidationFailure {
+    verify(version == VERSION, "Wrong version: %s", version);
+    validateCollectionClass(extraData.keySet(), String.class);
+    validateCollectionClass(extraData.values(), String.class);
+    Set<String> destinations = new HashSet<>(commits.size());
+    validateCollectionClass(commits, SinglePendingCommit.class);
+    for (SinglePendingCommit c : commits) {
+      c.validate();
+      verify(!destinations.contains(c.getDestinationKey()),
+          "Destination %s is written to by more than one pending commit",
+          c.getDestinationKey());
+      destinations.add(c.getDestinationKey());
+    }
+  }
+
+  @Override
+  public byte[] toBytes() throws IOException {
+    return serializer().toBytes(this);
+  }
+
+  /**
+   * Number of commits.
+   * @return the number of commits in this structure.
+   */
+  public int size() {
+    return commits != null ? commits.size() : 0;
+  }
+
+  @Override
+  public void save(FileSystem fs, Path path, boolean overwrite)
+      throws IOException {
+    serializer().save(fs, path, this, overwrite);
+  }
+
+  /** @return the version marker. */
+  public int getVersion() {
+    return version;
+  }
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  /**
+   * @return commit list.
+   */
+  public List<SinglePendingCommit> getCommits() {
+    return commits;
+  }
+
+  public void setCommits(List<SinglePendingCommit> commits) {
+    this.commits = commits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
new file mode 100644
index 0000000..cc27d07
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/PersistentCommitData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
+
+/**
+ * Class for single/multiple commit data structures.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class PersistentCommitData implements Serializable {
+
+  /**
+   * Supported version value: {@value}.
+   * If this is changed the value of {@code serialVersionUID} will change,
+   * to avoid deserialization problems.
+   */
+  public static final int VERSION = 1;
+
+  /**
+   * Validate the data: those fields which must be non empty, must be set.
+   * @throws ValidationFailure if the data is invalid
+   */
+  public abstract void validate() throws ValidationFailure;
+
+  /**
+   * Serialize to JSON and then to a byte array, after performing a
+   * preflight validation of the data to be saved.
+   * @return the data in a persistable form.
+   * @throws IOException serialization problem or validation failure.
+   */
+  public abstract byte[] toBytes() throws IOException;
+
+  /**
+   * Save to a hadoop filesystem.
+   * @param fs filesystem
+   * @param path path
+   * @param overwrite should any existing file be overwritten
+   * @throws IOException IO exception
+   */
+  public abstract void save(FileSystem fs, Path path, boolean overwrite)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
new file mode 100644
index 0000000..85cc38a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.s3.model.PartETag;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static 
org.apache.hadoop.fs.s3a.commit.CommitUtils.validateCollectionClass;
+import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify;
+import static org.apache.hadoop.util.StringUtils.join;
+
+/**
+ * This is the serialization format for uploads yet to be committerd.
+ *
+ * It's marked as {@link Serializable} so that it can be passed in RPC
+ * calls; for this to work it relies on the fact that java.io ArrayList
+ * and LinkedList are serializable. If any other list type is used for etags,
+ * it must also be serialized. Jackson expects lists, and it is used
+ * to persist to disk.
+ *
+ */
+@SuppressWarnings("unused")
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SinglePendingCommit extends PersistentCommitData
+    implements Iterable<String> {
+
+  /**
+   * Serialization ID: {@value}.
+   */
+  private static final long serialVersionUID = 0x10000 + VERSION;
+
+  /** Version marker. */
+  private int version = VERSION;
+
+  /**
+   * This is the filename of the pending file itself.
+   * Used during processing; it's persistent value, if any, is ignored.
+   */
+  private String filename;
+
+  /** Path URI of the destination. */
+  private String uri = "";
+
+  /** ID of the upload. */
+  private String uploadId;
+
+  /** Destination bucket. */
+  private String bucket;
+
+  /** Destination key in the bucket. */
+  private String destinationKey;
+
+  /** When was the upload created? */
+  private long created;
+
+  /** When was the upload saved? */
+  private long saved;
+
+  /** timestamp as date; no expectation of parseability. */
+  private String date;
+
+  /** Job ID, if known. */
+  private String jobId = "";
+
+  /** Task ID, if known. */
+  private String taskId = "";
+
+  /** Arbitrary notes. */
+  private String text = "";
+
+  /** Ordered list of etags. */
+  private List<String> etags;
+
+  /**
+   * Any custom extra data committer subclasses may choose to add.
+   */
+  private Map<String, String> extraData = new HashMap<>(0);
+
+  /** Destination file size. */
+  private long length;
+
+  public SinglePendingCommit() {
+  }
+
+  /**
+   * Get a JSON serializer for this class.
+   * @return a serializer.
+   */
+  public static JsonSerialization<SinglePendingCommit> serializer() {
+    return new JsonSerialization<>(SinglePendingCommit.class, false, true);
+  }
+
+  /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param path path
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static SinglePendingCommit load(FileSystem fs, Path path)
+      throws IOException {
+    SinglePendingCommit instance = serializer().load(fs, path);
+    instance.filename = path.toString();
+    instance.validate();
+    return instance;
+  }
+
+  /**
+   * Deserialize via java Serialization API: deserialize the instance
+   * and then call {@link #validate()} to verify that the deserialized
+   * data is valid.
+   * @param inStream input stream
+   * @throws IOException IO problem
+   * @throws ClassNotFoundException reflection problems
+   * @throws ValidationFailure validation failure
+   */
+  private void readObject(ObjectInputStream inStream) throws IOException,
+      ClassNotFoundException {
+    inStream.defaultReadObject();
+    validate();
+  }
+
+  /**
+   * Set the various timestamp fields to the supplied value.
+   * @param millis time in milliseconds
+   */
+  public void touch(long millis) {
+    created = millis;
+    saved = millis;
+    date = new Date(millis).toString();
+  }
+
+  /**
+   * Set the commit data.
+   * @param parts ordered list of etags.
+   * @throws ValidationFailure if the data is invalid
+   */
+  public void bindCommitData(List<PartETag> parts) throws ValidationFailure {
+    etags = new ArrayList<>(parts.size());
+    int counter = 1;
+    for (PartETag part : parts) {
+      verify(part.getPartNumber() == counter,
+          "Expected part number %s but got %s", counter, part.getPartNumber());
+      etags.add(part.getETag());
+      counter++;
+    }
+  }
+
+  @Override
+  public void validate() throws ValidationFailure {
+    verify(version == VERSION, "Wrong version: %s", version);
+    verify(StringUtils.isNotEmpty(bucket), "Empty bucket");
+    verify(StringUtils.isNotEmpty(destinationKey),
+        "Empty destination");
+    verify(StringUtils.isNotEmpty(uploadId), "Empty uploadId");
+    verify(length >= 0, "Invalid length: " + length);
+    destinationPath();
+    verify(etags != null, "No etag list");
+    validateCollectionClass(etags, String.class);
+    for (String etag : etags) {
+      verify(StringUtils.isNotEmpty(etag), "Empty etag");
+    }
+    if (extraData != null) {
+      validateCollectionClass(extraData.keySet(), String.class);
+      validateCollectionClass(extraData.values(), String.class);
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "DelayedCompleteData{");
+    sb.append("version=").append(version);
+    sb.append(", uri='").append(uri).append('\'');
+    sb.append(", destination='").append(destinationKey).append('\'');
+    sb.append(", uploadId='").append(uploadId).append('\'');
+    sb.append(", created=").append(created);
+    sb.append(", saved=").append(saved);
+    sb.append(", size=").append(length);
+    sb.append(", date='").append(date).append('\'');
+    sb.append(", jobId='").append(jobId).append('\'');
+    sb.append(", taskId='").append(taskId).append('\'');
+    sb.append(", notes='").append(text).append('\'');
+    if (etags != null) {
+      sb.append(", etags=[");
+      sb.append(join(",", etags));
+      sb.append(']');
+    } else {
+      sb.append(", etags=null");
+    }
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  public byte[] toBytes() throws IOException {
+    validate();
+    return serializer().toBytes(this);
+  }
+
+  @Override
+  public void save(FileSystem fs, Path path, boolean overwrite)
+      throws IOException {
+    serializer().save(fs, path, this, overwrite);
+  }
+
+  /**
+   * Build the destination path of the object.
+   * @return the path
+   * @throws IllegalStateException if the URI is invalid
+   */
+  public Path destinationPath() {
+    Preconditions.checkState(StringUtils.isNotEmpty(uri), "Empty uri");
+    try {
+      return new Path(new URI(uri));
+    } catch (URISyntaxException e) {
+      throw new IllegalStateException("Cannot parse URI " + uri);
+    }
+  }
+
+  /**
+   * Get the number of etags.
+   * @return the size of the etag list.
+   */
+  public int getPartCount() {
+    return etags.size();
+  }
+
+  /**
+   * Iterate over the etags.
+   * @return an iterator.
+   */
+  @Override
+  public Iterator<String> iterator() {
+    return etags.iterator();
+  }
+
+  /** @return version marker. */
+  public int getVersion() {
+    return version;
+  }
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  /**
+   * This is the filename of the pending file itself.
+   * Used during processing; it's persistent value, if any, is ignored.
+   * @return filename
+   */
+  public String getFilename() {
+    return filename;
+  }
+
+  public void setFilename(String filename) {
+    this.filename = filename;
+  }
+
+  /** @return path URI of the destination. */
+  public String getUri() {
+    return uri;
+  }
+
+  public void setUri(String uri) {
+    this.uri = uri;
+  }
+
+  /** @return ID of the upload. */
+  public String getUploadId() {
+    return uploadId;
+  }
+
+  public void setUploadId(String uploadId) {
+    this.uploadId = uploadId;
+  }
+
+  /** @return destination bucket. */
+  public String getBucket() {
+    return bucket;
+  }
+
+  public void setBucket(String bucket) {
+    this.bucket = bucket;
+  }
+
+  /** @return destination key in the bucket. */
+  public String getDestinationKey() {
+    return destinationKey;
+  }
+
+  public void setDestinationKey(String destinationKey) {
+    this.destinationKey = destinationKey;
+  }
+
+  /**
+   * When was the upload created?
+   * @return timestamp
+   */
+  public long getCreated() {
+    return created;
+  }
+
+  public void setCreated(long created) {
+    this.created = created;
+  }
+
+  /**
+   * When was the upload saved?
+   * @return timestamp
+   */
+  public long getSaved() {
+    return saved;
+  }
+
+  public void setSaved(long saved) {
+    this.saved = saved;
+  }
+
+  /**
+   * Timestamp as date; no expectation of parseability.
+   * @return date string
+   */
+  public String getDate() {
+    return date;
+  }
+
+  public void setDate(String date) {
+    this.date = date;
+  }
+
+  /** @return Job ID, if known. */
+  public String getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+
+  /** @return Task ID, if known. */
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * Arbitrary notes.
+   * @return any notes
+   */
+  public String getText() {
+    return text;
+  }
+
+  public void setText(String text) {
+    this.text = text;
+  }
+
+  /** @return ordered list of etags. */
+  public List<String> getEtags() {
+    return etags;
+  }
+
+  public void setEtags(List<String> etags) {
+    this.etags = etags;
+  }
+
+  /**
+   * Any custom extra data committer subclasses may choose to add.
+   * @return custom data
+   */
+  public Map<String, String> getExtraData() {
+    return extraData;
+  }
+
+  public void setExtraData(Map<String, String> extraData) {
+    this.extraData = extraData;
+  }
+
+  /**
+   * Destination file size.
+   * @return size of destination object
+   */
+  public long getLength() {
+    return length;
+  }
+
+  public void setLength(long length) {
+    this.length = length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
new file mode 100644
index 0000000..6cf1f1e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SuccessData.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.ValidationFailure;
+import org.apache.hadoop.util.JsonSerialization;
+
+/**
+ * Summary data saved into a {@code _SUCCESS} marker file.
+ *
+ * This provides an easy way to determine which committer was used
+ * to commit work.
+ * <ol>
+ *   <li>File length == 0: classic {@code FileOutputCommitter}.</li>
+ *   <li>Loadable as {@link SuccessData}:
+ *   A s3guard committer with name in in {@link #committer} field.</li>
+ *   <li>Not loadable? Something else.</li>
+ * </ol>
+ *
+ * This is an unstable structure intended for diagnostics and testing.
+ * Applications reading this data should use/check the {@link #name} field
+ * to differentiate from any other JSON-based manifest and to identify
+ * changes in the output format.
+ */
+@SuppressWarnings("unused")
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SuccessData extends PersistentCommitData {
+  private static final Logger LOG = LoggerFactory.getLogger(SuccessData.class);
+
+  /**
+   * Serialization ID: {@value}.
+   */
+  private static final long serialVersionUID = 507133045258460084L;
+
+  /**
+   * Name to include in persisted data, so as to differentiate from
+   * any other manifests: {@value}.
+   */
+  public static final String NAME
+      = "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1";
+
+  /**
+   * Name of file; includes version marker.
+   */
+  private String name;
+
+  /** Timestamp of creation. */
+  private long timestamp;
+
+  /** Timestamp as date string; no expectation of parseability. */
+  private String date;
+
+  /**
+   * Host which created the file (implicitly: committed the work).
+   */
+  private String hostname;
+
+  /**
+   * Committer name.
+   */
+  private String committer;
+
+  /**
+   * Description text.
+   */
+  private String description;
+
+  /**
+   * Metrics.
+   */
+  private Map<String, Long> metrics = new HashMap<>();
+
+  /**
+   * Diagnostics information.
+   */
+  private Map<String, String> diagnostics = new HashMap<>();
+
+  /**
+   * Filenames in the commit.
+   */
+  private List<String> filenames = new ArrayList<>(0);
+
+  @Override
+  public void validate() throws ValidationFailure {
+    ValidationFailure.verify(name != null,
+        "Incompatible file format: no 'name' field");
+    ValidationFailure.verify(NAME.equals(name),
+        "Incompatible file format: " + name);
+  }
+
+  @Override
+  public byte[] toBytes() throws IOException {
+    return serializer().toBytes(this);
+  }
+
+  @Override
+  public void save(FileSystem fs, Path path, boolean overwrite)
+      throws IOException {
+    // always set the name field before being saved.
+    name = NAME;
+    serializer().save(fs, path, this, overwrite);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "SuccessData{");
+    sb.append("committer='").append(committer).append('\'');
+    sb.append(", hostname='").append(hostname).append('\'');
+    sb.append(", description='").append(description).append('\'');
+    sb.append(", date='").append(date).append('\'');
+    sb.append(", filenames=[").append(
+        StringUtils.join(filenames, ", "))
+        .append("]");
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Dump the metrics (if any) to a string.
+   * The metrics are sorted for ease of viewing.
+   * @param prefix prefix before every entry
+   * @param middle string between key and value
+   * @param suffix suffix to each entry
+   * @return the dumped string
+   */
+  public String dumpMetrics(String prefix, String middle, String suffix) {
+    return joinMap(metrics, prefix, middle, suffix);
+  }
+
+  /**
+   * Dump the diagnostics (if any) to a string.
+   * @param prefix prefix before every entry
+   * @param middle string between key and value
+   * @param suffix suffix to each entry
+   * @return the dumped string
+   */
+  public String dumpDiagnostics(String prefix, String middle, String suffix) {
+    return joinMap(diagnostics, prefix, middle, suffix);
+  }
+
+  /**
+   * Join any map of string to value into a string, sorting the keys first.
+   * @param map map to join
+   * @param prefix prefix before every entry
+   * @param middle string between key and value
+   * @param suffix suffix to each entry
+   * @return a string for reporting.
+   */
+  protected static String joinMap(Map<String, ?> map,
+      String prefix,
+      String middle, String suffix) {
+    if (map == null) {
+      return "";
+    }
+    List<String> list = new ArrayList<>(map.keySet());
+    Collections.sort(list);
+    StringBuilder sb = new StringBuilder(list.size() * 32);
+    for (String k : list) {
+      sb.append(prefix)
+          .append(k)
+          .append(middle)
+          .append(map.get(k))
+          .append(suffix);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Load an instance from a file, then validate it.
+   * @param fs filesystem
+   * @param path path
+   * @return the loaded instance
+   * @throws IOException IO failure
+   * @throws ValidationFailure if the data is invalid
+   */
+  public static SuccessData load(FileSystem fs, Path path)
+      throws IOException {
+    LOG.debug("Reading success data from {}", path);
+    SuccessData instance = serializer().load(fs, path);
+    instance.validate();
+    return instance;
+  }
+
+  /**
+   * Get a JSON serializer for this class.
+   * @return a serializer.
+   */
+  private static JsonSerialization<SuccessData> serializer() {
+    return new JsonSerialization<>(SuccessData.class, false, true);
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /** @return timestamp of creation. */
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  /** @return timestamp as date; no expectation of parseability. */
+  public String getDate() {
+    return date;
+  }
+
+  public void setDate(String date) {
+    this.date = date;
+  }
+
+  /**
+   * @return host which created the file (implicitly: committed the work).
+   */
+  public String getHostname() {
+    return hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  /**
+   * @return committer name.
+   */
+  public String getCommitter() {
+    return committer;
+  }
+
+  public void setCommitter(String committer) {
+    this.committer = committer;
+  }
+
+  /**
+   * @return any description text.
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  /**
+   * @return any metrics.
+   */
+  public Map<String, Long> getMetrics() {
+    return metrics;
+  }
+
+  public void setMetrics(Map<String, Long> metrics) {
+    this.metrics = metrics;
+  }
+
+  /**
+   * @return a list of filenames in the commit.
+   */
+  public List<String> getFilenames() {
+    return filenames;
+  }
+
+  public void setFilenames(List<String> filenames) {
+    this.filenames = filenames;
+  }
+
+  public Map<String, String> getDiagnostics() {
+    return diagnostics;
+  }
+
+  public void setDiagnostics(Map<String, String> diagnostics) {
+    this.diagnostics = diagnostics;
+  }
+
+  /**
+   * Add a diagnostics entry.
+   * @param key name
+   * @param value value
+   */
+  public void addDiagnostic(String key, String value) {
+    diagnostics.put(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java
new file mode 100644
index 0000000..0d57494
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Persistent data formats for the committers.
+ *
+ * All of these formats share a base class of
+ * {@link org.apache.hadoop.fs.s3a.commit.files.PersistentCommitData};
+ * the subclasses record
+ * <ol>
+ *   <li>The content of a single pending commit
+ *   (used by the Magic committer).</li>
+ *   <li>The list of all the files uploaded by a staging committer.</li>
+ *   <li>The summary information saved in the {@code _SUCCESS} file.</li>
+ * </ol>
+ *
+ * There are no guarantees of stability between versions; these are internal
+ * structures.
+ *
+ * The {@link org.apache.hadoop.fs.s3a.commit.files.SuccessData} file is
+ * the one visible to callers after a job completes; it is an unstable
+ * manifest intended for testing only.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
new file mode 100644
index 0000000..cf365c2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.PutTracker;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+
+/**
+ * Put tracker for Magic commits.
+ * <p>Important</p>: must not directly or indirectly import a class which
+ * uses any datatype in hadoop-mapreduce.
+ */
+@InterfaceAudience.Private
+public class MagicCommitTracker extends PutTracker {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      MagicCommitTracker.class);
+
+  private final String originalDestKey;
+  private final String pendingPartKey;
+  private final Path path;
+  private final WriteOperationHelper writer;
+  private final String bucket;
+  private static final byte[] EMPTY = new byte[0];
+
+  /**
+   * Magic commit tracker.
+   * @param path path nominally being written to
+   * @param bucket dest bucket
+   * @param originalDestKey the original key, in the magic directory.
+   * @param destKey key for the destination
+   * @param pendingsetKey key of the pendingset file
+   * @param writer writer instance to use for operations
+   */
+  public MagicCommitTracker(Path path,
+      String bucket,
+      String originalDestKey,
+      String destKey,
+      String pendingsetKey,
+      WriteOperationHelper writer) {
+    super(destKey);
+    this.bucket = bucket;
+    this.path = path;
+    this.originalDestKey = originalDestKey;
+    this.pendingPartKey = pendingsetKey;
+    this.writer = writer;
+    LOG.info("File {} is written as magic file to path {}",
+        path, destKey);
+  }
+
+  /**
+   * Initialize the tracker.
+   * @return true, indicating that the multipart commit must start.
+   * @throws IOException any IO problem.
+   */
+  @Override
+  public boolean initialize() throws IOException {
+    return true;
+  }
+
+  /**
+   * Flag to indicate that output is not visible after the stream
+   * is closed.
+   * @return true
+   */
+  @Override
+  public boolean outputImmediatelyVisible() {
+    return false;
+  }
+
+  /**
+   * Complete operation: generate the final commit data, put it.
+   * @param uploadId Upload ID
+   * @param parts list of parts
+   * @param bytesWritten bytes written
+   * @return false, indicating that the commit must fail.
+   * @throws IOException any IO problem.
+   * @throws IllegalArgumentException bad argument
+   */
+  @Override
+  public boolean aboutToComplete(String uploadId,
+      List<PartETag> parts,
+      long bytesWritten)
+      throws IOException {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
+        "empty/null upload ID: "+ uploadId);
+    Preconditions.checkArgument(parts != null,
+        "No uploaded parts list");
+    Preconditions.checkArgument(!parts.isEmpty(),
+        "No uploaded parts to save");
+    SinglePendingCommit commitData = new SinglePendingCommit();
+    commitData.touch(System.currentTimeMillis());
+    commitData.setDestinationKey(getDestKey());
+    commitData.setBucket(bucket);
+    commitData.setUri(path.toUri().toString());
+    commitData.setUploadId(uploadId);
+    commitData.setText("");
+    commitData.setLength(bytesWritten);
+    commitData.bindCommitData(parts);
+    byte[] bytes = commitData.toBytes();
+    LOG.info("Uncommitted data pending to file {};"
+            + " commit metadata for {} parts in {}. sixe: {} byte(s)",
+        path.toUri(), parts.size(), pendingPartKey, bytesWritten);
+    LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
+        path, pendingPartKey, commitData);
+    PutObjectRequest put = writer.createPutObjectRequest(
+        pendingPartKey,
+        new ByteArrayInputStream(bytes),
+        bytes.length);
+    writer.uploadObject(put);
+
+    // now put a 0-byte file with the name of the original under-magic path
+    PutObjectRequest originalDestPut = writer.createPutObjectRequest(
+        originalDestKey,
+        new ByteArrayInputStream(EMPTY),
+        0);
+    writer.uploadObject(originalDestPut);
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "MagicCommitTracker{");
+    sb.append(", destKey=").append(getDestKey());
+    sb.append(", pendingPartKey='").append(pendingPartKey).append('\'');
+    sb.append(", path=").append(path);
+    sb.append(", writer=").append(writer);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
new file mode 100644
index 0000000..c305141
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitOperations;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR;
+import org.apache.hadoop.fs.s3a.commit.DurationInfo;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
+
+/**
+ * This is a dedicated committer which requires the "magic" directory feature
+ * of the S3A Filesystem to be enabled; it then uses paths for task and job
+ * attempts in magic paths, so as to ensure that the final output goes direct
+ * to the destination directory.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MagicS3GuardCommitter extends AbstractS3ACommitter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MagicS3GuardCommitter.class);
+
+  /** Name: {@value}. */
+  public static final String NAME = CommitConstants.COMMITTER_NAME_MAGIC;
+
+  /**
+   * Create a task committer.
+   * @param outputPath the job's output path
+   * @param context the task's context
+   * @throws IOException on a failure
+   */
+  public MagicS3GuardCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    setWorkPath(getTaskAttemptPath(context));
+    verifyIsMagicCommitPath(getDestS3AFS(), getWorkPath());
+    LOG.debug("Task attempt {} has work path {}",
+        context.getTaskAttemptID(),
+        getWorkPath());
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /**
+   * Require magic paths in the FS client.
+   * @return true, always.
+   */
+  @Override
+  protected boolean requiresDelayedCommitOutputInFileSystem() {
+    return true;
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Setup Job %s", jobIdString(context))) {
+      Path jobAttemptPath = getJobAttemptPath(context);
+      getDestinationFS(jobAttemptPath,
+          context.getConfiguration()).mkdirs(jobAttemptPath);
+    }
+  }
+
+  /**
+   * Get the list of pending uploads for this job attempt, by listing
+   * all .pendingset files in the job attempt directory.
+   * @param context job context
+   * @return a list of pending commits.
+   * @throws IOException Any IO failure
+   */
+  protected List<SinglePendingCommit> listPendingUploadsToCommit(
+      JobContext context)
+      throws IOException {
+    FileSystem fs = getDestFS();
+    return loadPendingsetFiles(context, false, fs,
+        listAndFilter(fs, getJobAttemptPath(context), false,
+            CommitOperations.PENDINGSET_FILTER));
+  }
+
+  /**
+   * Delete the magic directory.
+   */
+  public void cleanupStagingDirs() {
+    Path path = magicSubdir(getOutputPath());
+    Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
+        () -> deleteWithWarning(getDestFS(), path, true));
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Setup Task %s", context.getTaskAttemptID())) {
+      Path taskAttemptPath = getTaskAttemptPath(context);
+      FileSystem fs = taskAttemptPath.getFileSystem(getConf());
+      fs.mkdirs(taskAttemptPath);
+    }
+  }
+
+  /**
+   * Did this task write any files in the work directory?
+   * Probes for a task existing by looking to see if the attempt dir exists.
+   * This adds more HTTP requests to the call. It may be better just to
+   * return true and rely on the commit task doing the work.
+   * @param context the task's context
+   * @return true if the attempt path exists
+   * @throws IOException failure to list the path
+   */
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context)
+      throws IOException {
+    Path taskAttemptPath = getTaskAttemptPath(context);
+    try (DurationInfo d = new DurationInfo(LOG,
+        "needsTaskCommit task %s", context.getTaskAttemptID())) {
+      return taskAttemptPath.getFileSystem(
+          context.getConfiguration())
+          .exists(taskAttemptPath);
+    }
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Commit task %s", context.getTaskAttemptID())) {
+      PendingSet commits = innerCommitTask(context);
+      LOG.info("Task {} committed {} files", context.getTaskAttemptID(),
+          commits.size());
+    } catch (IOException e) {
+      getCommitOperations().taskCompleted(false);
+      throw e;
+    } finally {
+      // delete the task attempt so there's no possibility of a second attempt
+      deleteTaskAttemptPathQuietly(context);
+    }
+    getCommitOperations().taskCompleted(true);
+  }
+
+  /**
+   * Inner routine for committing a task.
+   * The list of pending commits is loaded and then saved to the job attempt
+   * dir.
+   * Failure to load any file or save the final file triggers an abort of
+   * all known pending commits.
+   * @param context context
+   * @return the summary file
+   * @throws IOException exception
+   */
+  private PendingSet innerCommitTask(
+      TaskAttemptContext context) throws IOException {
+    Path taskAttemptPath = getTaskAttemptPath(context);
+    // load in all pending commits.
+    CommitOperations actions = getCommitOperations();
+    Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>>
+        loaded = actions.loadSinglePendingCommits(
+            taskAttemptPath, true);
+    PendingSet pendingSet = loaded.getKey();
+    List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue();
+    if (!failures.isEmpty()) {
+      // At least one file failed to load
+      // revert all which did; report failure with first exception
+      LOG.error("At least one commit file could not be read: failing");
+      abortPendingUploads(context, pendingSet.getCommits(), true);
+      throw failures.get(0).getValue();
+    }
+    // patch in IDs
+    String jobId = String.valueOf(context.getJobID());
+    String taskId = String.valueOf(context.getTaskAttemptID());
+    for (SinglePendingCommit commit : pendingSet.getCommits()) {
+      commit.setJobId(jobId);
+      commit.setTaskId(taskId);
+    }
+
+    Path jobAttemptPath = getJobAttemptPath(context);
+    TaskAttemptID taskAttemptID = context.getTaskAttemptID();
+    Path taskOutcomePath = new Path(jobAttemptPath,
+        taskAttemptID.getTaskID().toString() +
+        CommitConstants.PENDINGSET_SUFFIX);
+    LOG.info("Saving work of {} to {}", taskAttemptID, taskOutcomePath);
+    try {
+      pendingSet.save(getDestFS(), taskOutcomePath, false);
+    } catch (IOException e) {
+      LOG.warn("Failed to save task commit data to {} ",
+          taskOutcomePath, e);
+      abortPendingUploads(context, pendingSet.getCommits(), true);
+      throw e;
+    }
+    return pendingSet;
+  }
+
+  /**
+   * Abort a task. Attempt load then abort all pending files,
+   * then try to delete the task attempt path.
+   * This method may be called on the job committer, rather than the
+   * task one (such as in the MapReduce AM after a task container failure).
+   * It must extract all paths and state from the passed in context.
+   * @param context task context
+   * @throws IOException if there was some problem querying the path other
+   * than it not actually existing.
+   */
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    Path attemptPath = getTaskAttemptPath(context);
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Abort task %s", context.getTaskAttemptID())) {
+      getCommitOperations().abortAllSinglePendingCommits(attemptPath, true);
+    } finally {
+      deleteQuietly(
+          attemptPath.getFileSystem(context.getConfiguration()),
+          attemptPath, true);
+    }
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  protected Path getJobAttemptPath(int appAttemptId) {
+    return getMagicJobAttemptPath(appAttemptId, getOutputPath());
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   *
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  public Path getTaskAttemptPath(TaskAttemptContext context) {
+    return getMagicTaskAttemptPath(context, getOutputPath());
+  }
+
+  @Override
+  protected Path getBaseTaskAttemptPath(TaskAttemptContext context) {
+    return getBaseMagicTaskAttemptPath(context, getOutputPath());
+  }
+
+  /**
+   * Get a temporary directory for data. When a task is aborted/cleaned
+   * up, the contents of this directory are all deleted.
+   * @param context task context
+   * @return a path for temporary data.
+   */
+  public Path getTempTaskAttemptPath(TaskAttemptContext context) {
+    return CommitUtilsWithMR.getTempTaskAttemptPath(context, getOutputPath());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
new file mode 100644
index 0000000..0392491
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+/**
+ * Factory for the Magic committer.
+ */
+public class MagicS3GuardCommitterFactory
+    extends AbstractS3ACommitterFactory {
+
+  /**
+   * Name of this class: {@value}.
+   */
+  public static final String CLASSNAME
+      = "org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory";
+
+  public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new MagicS3GuardCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java
new file mode 100644
index 0000000..56925e9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This is the "Magic" committer and support.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java
new file mode 100644
index 0000000..7647675
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for committing the output of analytics jobs directly to S3.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java
new file mode 100644
index 0000000..52d3411
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/ConflictResolution.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+/**
+ * Enum of conflict resolution algorithms.
+ */
+public enum ConflictResolution {
+  /** Fail. */
+  FAIL,
+
+  /** Merge new data with existing data. */
+  APPEND,
+
+  /** Overwrite existing data. */
+  REPLACE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
new file mode 100644
index 0000000..03dfed2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+
+/**
+ * This commits to a directory.
+ * The conflict policy is
+ * <ul>
+ *   <li>FAIL: fail the commit</li>
+ *   <li>APPEND: add extra data to the destination.</li>
+ *   <li>REPLACE: delete the destination directory in the job commit
+ *   (i.e. after and only if all tasks have succeeded.</li>
+ * </ul>
+ */
+public class DirectoryStagingCommitter extends StagingCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DirectoryStagingCommitter.class);
+
+  /** Name: {@value}. */
+  public static final String NAME = COMMITTER_NAME_DIRECTORY;
+
+  public DirectoryStagingCommitter(Path outputPath, TaskAttemptContext context)
+      throws IOException {
+    super(outputPath, context);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    super.setupJob(context);
+    Path outputPath = getOutputPath();
+    FileSystem fs = getDestFS();
+    if (getConflictResolutionMode(context, fs.getConf())
+        == ConflictResolution.FAIL
+        && fs.exists(outputPath)) {
+      LOG.debug("Failing commit by task attempt {} to write"
+              + " to existing output path {}",
+          context.getJobID(), getOutputPath());
+      throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
+    }
+  }
+
+  /**
+   * Pre-commit actions for a job.
+   * Here: look at the conflict resolution mode and choose
+   * an action based on the current policy.
+   * @param context job context
+   * @param pending pending commits
+   * @throws IOException any failure
+   */
+  @Override
+  protected void preCommitJob(JobContext context,
+      List<SinglePendingCommit> pending) throws IOException {
+    Path outputPath = getOutputPath();
+    FileSystem fs = getDestFS();
+    Configuration fsConf = fs.getConf();
+    switch (getConflictResolutionMode(context, fsConf)) {
+    case FAIL:
+      // this was checked in setupJob, but this avoids some cases where
+      // output was created while the job was processing
+      if (fs.exists(outputPath)) {
+        throw new PathExistsException(outputPath.toString(), E_DEST_EXISTS);
+      }
+      break;
+    case APPEND:
+      // do nothing
+      break;
+    case REPLACE:
+      if (fs.delete(outputPath, true /* recursive */)) {
+        LOG.info("{}: removed output path to be replaced: {}",
+            getRole(), outputPath);
+      }
+      break;
+    default:
+      throw new IOException(getRole() + ": unknown conflict resolution mode: "
+          + getConflictResolutionMode(context, fsConf));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
new file mode 100644
index 0000000..bfa8914
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/DirectoryStagingCommitterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+/**
+ * Factory for the Directory committer.
+ */
+public class DirectoryStagingCommitterFactory
+    extends AbstractS3ACommitterFactory {
+
+  /**
+   * Name of this class: {@value}.
+   */
+  public static final String CLASSNAME
+      = "org.apache.hadoop.fs.s3a.commit.staging"
+      + ".DirectoryStagingCommitterFactory";
+
+  public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new DirectoryStagingCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
new file mode 100644
index 0000000..bfaf443
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+
+/**
+ * Partitioned committer.
+ * This writes data to specific "partition" subdirectories, applying
+ * conflict resolution on a partition-by-partition basis. The existence
+ * and state of any parallel partitions for which there is no are output
+ * files are not considered in the conflict resolution.
+ *
+ * The conflict policy is
+ * <ul>
+ *   <li>FAIL: fail the commit if any of the partitions have data.</li>
+ *   <li>APPEND: add extra data to the destination partitions.</li>
+ *   <li>REPLACE: delete the destination partition in the job commit
+ *   (i.e. after and only if all tasks have succeeded.</li>
+ * </ul>
+ */
+public class PartitionedStagingCommitter extends StagingCommitter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      PartitionedStagingCommitter.class);
+
+  /** Name: {@value}. */
+  public static final String NAME = COMMITTER_NAME_PARTITIONED;
+
+  public PartitionedStagingCommitter(Path outputPath,
+      TaskAttemptContext context)
+      throws IOException {
+    super(outputPath, context);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "PartitionedStagingCommitter{");
+    sb.append(super.toString());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  protected int commitTaskInternal(TaskAttemptContext context,
+      List<? extends FileStatus> taskOutput) throws IOException {
+    Path attemptPath = getTaskAttemptPath(context);
+    Set<String> partitions = Paths.getPartitions(attemptPath, taskOutput);
+
+    // enforce conflict resolution, but only if the mode is FAIL. for APPEND,
+    // it doesn't matter that the partitions are already there, and for 
REPLACE,
+    // deletion should be done during job commit.
+    FileSystem fs = getDestFS();
+    if (getConflictResolutionMode(context, fs.getConf())
+        == ConflictResolution.FAIL) {
+      for (String partition : partitions) {
+        // getFinalPath adds the UUID to the file name. this needs the parent.
+        Path partitionPath = getFinalPath(partition + "/file",
+            context).getParent();
+        if (fs.exists(partitionPath)) {
+          LOG.debug("Failing commit by task attempt {} to write"
+              + " to existing path {} under {}",
+              context.getTaskAttemptID(), partitionPath, getOutputPath());
+          throw new PathExistsException(partitionPath.toString(),
+              E_DEST_EXISTS);
+        }
+      }
+    }
+    return super.commitTaskInternal(context, taskOutput);
+  }
+
+  /**
+   * Job-side conflict resolution.
+   * The partition path conflict resolution actions are:
+   * <ol>
+   *   <li>FAIL: assume checking has taken place earlier; no more checks.</li>
+   *   <li>APPEND: allowed.; no need to check.</li>
+   *   <li>REPLACE deletes all existing partitions.</li>
+   * </ol>
+   * @param context job context
+   * @param pending the pending operations
+   * @throws IOException any failure
+   */
+  @Override
+  protected void preCommitJob(JobContext context,
+      List<SinglePendingCommit> pending) throws IOException {
+
+    FileSystem fs = getDestFS();
+
+    // enforce conflict resolution
+    Configuration fsConf = fs.getConf();
+    switch (getConflictResolutionMode(context, fsConf)) {
+    case FAIL:
+      // FAIL checking is done on the task side, so this does nothing
+      break;
+    case APPEND:
+      // no check is needed because the output may exist for appending
+      break;
+    case REPLACE:
+      Set<Path> partitions = pending.stream()
+          .map(SinglePendingCommit::destinationPath)
+          .map(Path::getParent)
+          .collect(Collectors.toCollection(Sets::newLinkedHashSet));
+      for (Path partitionPath : partitions) {
+        LOG.debug("{}: removing partition path to be replaced: " +
+            getRole(), partitionPath);
+        fs.delete(partitionPath, true);
+      }
+      break;
+    default:
+      throw new PathCommitException("",
+          getRole() + ": unknown conflict resolution mode: "
+          + getConflictResolutionMode(context, fsConf));
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to