mehakmeet commented on code in PR #5519:
URL: https://github.com/apache/hadoop/pull/5519#discussion_r1176027704


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java:
##########
@@ -67,98 +71,118 @@ protected CommitJobStage.Result executeStage(
         getJobId(),
         storeSupportsResilientCommit());
 
-    boolean createMarker = arguments.isCreateMarker();
+    // once the manifest has been loaded, a temp file needs to be
+    // deleted; so track teh value.

Review Comment:
   nit: "the"



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CommitJobStage.java:
##########
@@ -67,98 +71,118 @@ protected CommitJobStage.Result executeStage(
         getJobId(),
         storeSupportsResilientCommit());
 
-    boolean createMarker = arguments.isCreateMarker();
+    // once the manifest has been loaded, a temp file needs to be
+    // deleted; so track teh value.
+    LoadedManifestData loadedManifestData = null;
+
+    try {
+      boolean createMarker = arguments.isCreateMarker();
+      IOStatisticsSnapshot heapInfo = new IOStatisticsSnapshot();
+      addHeapInformation(heapInfo, "setup");
+      // load the manifests
+      final StageConfig stageConfig = getStageConfig();
+      LoadManifestsStage.Result result = new 
LoadManifestsStage(stageConfig).apply(

Review Comment:
   suggestion: We can include a duration tracker to know the time taken to load 
manifests in the final stats.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));

Review Comment:
   Suggestion: We could use `queue.offer(E e, long timeout, TimeUnit unit)`, 
such that we are waiting for the queue to have the capacity to add the Entry 
while also having a timeout in case something goes wrong. We can catch the 
interrupt and throw/swallow accordingly if it exceeds the timeout?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();

Review Comment:
   LOG something went wrong.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());

Review Comment:
   Some info about the entry that was queued in the LOG?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CreateOutputDirectoriesStage.java:
##########
@@ -274,8 +270,8 @@ private void createOneDirectory(final DirEntry dirEntry) 
throws IOException {
    * Try to efficiently and robustly create a directory in a method which is
    * expected to be executed in parallel with operations creating
    * peer directories.
-   * @param path path to create
-   * @return true if dir created/found
+   * @param dirEntry dir to create
+   * @return Outcome

Review Comment:
   nit: Better javadocs for return, "State of the directory in the dir map" or 
something?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          return false;
+        }
+      } else {
+        LOG.debug("Queue inactive; discarding {} entries", entries.size());

Review Comment:
   Should we make this LOG.err()? How often do you think we would be coming 
here?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.

Review Comment:
   typo: "Then"(First word)



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RemoteIterators.java:
##########
@@ -191,6 +191,37 @@ public static <S> RemoteIterator<S> closingRemoteIterator(
     return new CloseRemoteIterator<>(iterator, toClose);
   }
 
+  /**
+   * Wrap an iterator with one which adds a continuation probe.
+   * This allows work to exit fast without complicated breakout logic
+   * @param iterator source
+   * @param continueWork predicate which will trigger a fast halt if it 
returns false.
+   * @param <S> source type.
+   * @return a new iterator
+   */
+  public static <S> RemoteIterator<S> haltableRemoteIterator(
+      final RemoteIterator<S> iterator,
+      final CallableRaisingIOE<Boolean> continueWork) {
+    return new HaltableRemoteIterator<>(iterator, continueWork);
+  }
+
+  /**
+   * A remote iterator which simply counts up, stopping once the
+   * value is greater than the value of {@code excludedFinish}.
+   * This is primarily for tests or when submitting work into a TaskPool.
+   * equivalent to
+   * <pre>
+   *   for(long l = start, l &lt; finis; l++) yield l;

Review Comment:
   typo: "l < excludedFinish"



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md:
##########
@@ -234,6 +239,36 @@ Caveats
   `mapreduce.manifest.committer.io.rate` can help avoid this.
 
 
+### `mapreduce.manifest.committer.writer.queue.capacity`
+
+This is a secondary scale option.
+It controls the size of the queue for storing lists of files to rename from
+the manifests loaded from the target filesystem, manifests loaded
+from a pool of worker threads, and the single thread which saves
+the entries from each manifest to an intermediate file in the local filesystem.
+
+Once the queue is full, all manifest loading threads will block.
+
+```xml
+<property>
+  <name>mapreduce.manifest.committer.writer.queue.capacity</name>
+  <value>32</value>
+</property>
+```
+
+As the local filesystem is usually much faster to write to than any cloud 
store,
+this queue size should not be a limit on manifest load performance.
+
+It can help limit the amount of memory consumed during manifest load during
+job commit.
+The maximumum number of loaded manifests will be

Review Comment:
   typo: "maximum"



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          return false;
+        }
+      } else {
+        LOG.debug("Queue inactive; discarding {} entries", entries.size());
+        return false;
+      }
+    }
+
+    /**
+     * Queue and process entries until done.
+     * @return count of entries written.
+     * @throws UncheckedIOException on write failure
+     */
+    private int processor() {
+      Thread.currentThread().setName("EntryIOWriter");
+      try {
+        while (!stop.get()) {
+          final QueueEntry queueEntry = queue.take();

Review Comment:
   seems like we could wait indefinitely on this.
   How about `poll(long timeout, TimeUnit unit)`?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/EntryFileIO.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.FutureIO;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Read or write entry file.
+ * This can be used to create a simple reader, or to create
+ * a writer queue where different threads can queue data for
+ * writing.
+ * The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
+ */
+public class EntryFileIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntryFileIO.class);
+
+  /** Configuration used to load filesystems. */
+  private final Configuration conf;
+
+  /**
+   * Constructor.
+   * @param conf Configuration used to load filesystems
+   */
+  public EntryFileIO(final Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Create a writer to a local file.
+   * @param file file
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(File file) throws IOException {
+    return createWriter(toPath(file));
+  }
+
+  /**
+   * Create a writer to a file on any FS.
+   * @param path path to write to.
+   * @return the writer
+   * @throws IOException failure to create the file
+   */
+  public SequenceFile.Writer createWriter(Path path) throws IOException {
+    return SequenceFile.createWriter(conf,
+        SequenceFile.Writer.file(path),
+        SequenceFile.Writer.keyClass(NullWritable.class),
+        SequenceFile.Writer.valueClass(FileEntry.class));
+  }
+
+
+  /**
+   * Reader is created with sequential reads.
+   * @param file file
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(File file) throws IOException {
+    return createReader(toPath(file));
+  }
+
+  /**
+   * Reader is created with sequential reads.
+   * @param path path
+   * @return the reader
+   * @throws IOException failure to open
+   */
+  public SequenceFile.Reader createReader(Path path) throws IOException {
+    return new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(path));
+  }
+
+  /**
+   * Iterator to retrieve file entries from the sequence file.
+   * Closeable: cast and invoke to close the reader.
+   * @param reader reader;
+   * @return iterator
+   */
+  public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
+    return new EntryIterator(reader);
+  }
+
+  /**
+   * Create and start an entry writer.
+   * @param writer writer
+   * @param capacity queue capacity
+   * @return the writer.
+   */
+  public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int 
capacity) {
+    final EntryWriter ew = new EntryWriter(writer, capacity);
+    ew.start();
+    return ew;
+  }
+
+  /**
+   * Write a sequence of entries to the writer.
+   * @param writer writer
+   * @param entries entries
+   * @param close close the stream afterwards
+   * @return number of entries written
+   * @throws IOException write failure.
+   */
+  public static int write(SequenceFile.Writer writer,
+      Collection<FileEntry> entries,
+      boolean close)
+      throws IOException {
+    try {
+      for (FileEntry entry : entries) {
+        writer.append(NullWritable.get(), entry);
+      }
+      writer.flush();
+    } finally {
+      if (close) {
+        writer.close();
+      }
+    }
+    return entries.size();
+  }
+
+
+  /**
+   * Given a file, create a Path.
+   * @param file file
+   * @return path to the file
+   */
+  public static Path toPath(final File file) {
+    return new Path(file.toURI());
+  }
+
+
+  /**
+   * Actions in the queue.
+   */
+  private enum Actions {
+    /** Write the supplied list of entries. */
+    write,
+    /** Stop the processor thread. */
+    stop
+  }
+
+  /**
+   * What gets queued: an action and a list of entries.
+   */
+  private static final class QueueEntry {
+
+    private final Actions action;
+
+    private final List<FileEntry> entries;
+
+    private QueueEntry(final Actions action, List<FileEntry> entries) {
+      this.action = action;
+      this.entries = entries;
+    }
+
+    private QueueEntry(final Actions action) {
+      this(action, null);
+    }
+  }
+
+  /**
+   * A Writer thread takes reads from a queue containing
+   * list of entries to save; these are serialized via the writer to
+   * the output stream.
+   * Other threads can queue the file entry lists from loaded manifests
+   * for them to be written.
+   * The these threads will be blocked when the queue capacity is reached.
+   * This is quite a complex process, with the main troublespots in the code
+   * being:
+   * - managing the shutdown
+   * - failing safely on write failures, restarting all blocked writers in the 
process
+   */
+  public static final class EntryWriter implements Closeable {
+
+    /**
+     * The destination of the output.
+     */
+    private final SequenceFile.Writer writer;
+
+    /**
+     * Blocking queue of actions.
+     */
+    private final BlockingQueue<QueueEntry> queue;
+
+    /**
+     * stop flag.
+     */
+    private final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /**
+     * Is the processor thread active.
+     */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    /**
+     * Executor of writes.
+     */
+    private ExecutorService executor;
+
+    /**
+     * Future invoked.
+     */
+    private Future<Integer> future;
+
+    /**
+     * count of file entries saved; only updated in one thread
+     * so volatile.
+     */
+    private final AtomicInteger count = new AtomicInteger();
+
+    /**
+     * Any failure caught on the writer thread; this should be
+     * raised within the task/job thread as it implies that the
+     * entire write has failed.
+     */
+    private final AtomicReference<IOException> failure = new 
AtomicReference<>();
+
+    /**
+     * Create.
+     * @param writer writer
+     * @param capacity capacity.
+     */
+    private EntryWriter(SequenceFile.Writer writer, int capacity) {
+      checkState(capacity > 0, "invalid queue capacity %s", capacity);
+      this.writer = requireNonNull(writer);
+      this.queue = new ArrayBlockingQueue<>(capacity);
+    }
+
+    /**
+     * Is the writer active?
+     * @return true if the processor thread is live
+     */
+    public boolean isActive() {
+      return active.get();
+    }
+
+    /**
+     * Get count of files processed.
+     * @return the count
+     */
+    public int getCount() {
+      return count.get();
+    }
+
+    /**
+     * Any failure.
+     * @return any IOException caught when writing the output
+     */
+    public IOException getFailure() {
+      return failure.get();
+    }
+
+    /**
+     * Start the thread.
+     */
+    private void start() {
+      checkState(executor == null, "already started");
+      active.set(true);
+      executor = HadoopExecutors.newSingleThreadExecutor();
+      future = executor.submit(this::processor);
+      LOG.debug("Started entry writer {}", this);
+    }
+
+    /**
+     * Add a list of entries to the queue.
+     * @param entries entries.
+     * @return whether the queue worked.
+     */
+    public boolean enqueue(List<FileEntry> entries) {
+      if (entries.isEmpty()) {
+        LOG.debug("ignoring enqueue of empty list");
+        // exit fast, but return true.
+        return true;
+      }
+      if (active.get()) {
+        try {
+          queue.put(new QueueEntry(Actions.write, entries));
+          LOG.debug("Queued {}", entries.size());
+          return true;
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+          return false;
+        }
+      } else {
+        LOG.debug("Queue inactive; discarding {} entries", entries.size());
+        return false;
+      }
+    }
+
+    /**
+     * Queue and process entries until done.
+     * @return count of entries written.
+     * @throws UncheckedIOException on write failure
+     */
+    private int processor() {
+      Thread.currentThread().setName("EntryIOWriter");
+      try {
+        while (!stop.get()) {
+          final QueueEntry queueEntry = queue.take();
+          switch (queueEntry.action) {
+
+          case stop:  // stop the operation
+            LOG.debug("Stop processing");
+            stop.set(true);
+            break;
+
+          case write:  // write data
+          default:  // here to shut compiler up
+            // write
+            final List<FileEntry> entries = queueEntry.entries;
+            LOG.debug("Adding block of {} entries", entries.size());
+            for (FileEntry entry : entries) {
+              append(entry);
+            }
+            break;
+          }
+        }
+      } catch (IOException e) {
+        LOG.debug("Write failure", e);
+        failure.set(e);
+        throw new UncheckedIOException(e);
+      } catch (InterruptedException e) {
+        // being stopped implicitly
+        LOG.debug("interrupted", e);
+      } finally {
+        stop.set(true);
+        active.set(false);
+        // clear the queue, so wake up on any failure mode.
+        queue.clear();
+      }
+      return count.get();
+    }
+
+    /**
+     * write one entry.
+     * @param entry entry to write
+     * @throws IOException on write failure
+     */
+    private void append(FileEntry entry) throws IOException {
+      writer.append(NullWritable.get(), entry);
+
+      final int c = count.incrementAndGet();
+      LOG.trace("Added entry #{}: {}", c, entry);
+    }
+
+    /**
+     * Close: stop accepting new writes, wait for queued writes to complete.
+     * @throws IOException failure closing that writer, or somehow the future
+     * raises an IOE which isn't caught for later.
+     */
+    @Override
+    public void close() throws IOException {
+
+      // declare as inactive.
+      // this stops queueing more data, but leaves
+      // the worker thread still polling and writing.
+      if (!active.getAndSet(false)) {
+        // already stopped
+        return;
+      }
+      LOG.debug("Shutting down writer");
+      // signal queue closure by queuing a stop option.
+      // this is added at the end of the list of queued blocks,
+      // of which are written.
+      try {

Review Comment:
   LOG something like "Tasks left in queue = `capacity` - 
`queue.remainingCapacity()`" for better logging. We could do something like 
this while offering as well but seems apt for close().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to