This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4249c04  HADOOP-16798. S3A Committer thread pool shutdown problems. 
(#1963)
4249c04 is described below

commit 4249c04d454ca82aadeed152ab777e93474754ab
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Tue Jun 30 10:44:51 2020 +0100

    HADOOP-16798. S3A Committer thread pool shutdown problems. (#1963)
    
    
    
    Contributed by Steve Loughran.
    
    Fixes a condition which can cause job commit to fail if a task was
    aborted < 60s before the job commit commenced: the task abort
    will shut down the thread pool with a hard exit after 60s; the
    job commit POST requests would be scheduled through the same pool,
    so be interrupted and fail. At present the access is synchronized,
    but presumably the executor shutdown code is calling wait() and releasing
    locks.
    
    Task abort is triggered from the AM when task attempts succeed but
    there are still active speculative task attempts running. Thus it
    only surfaces when speculation is enabled and the final tasks are
    speculating, which, given they are the stragglers, is not unheard of.
    
    Note: this problem has never been seen in production; it has surfaced
    in the hadoop-aws tests on a heavily overloaded desktop
---
 .../hadoop/fs/s3a/commit/AbstractS3ACommitter.java | 129 ++++++++++++++++-----
 .../org/apache/hadoop/fs/s3a/commit/Tasks.java     |  23 +++-
 .../staging/PartitionedStagingCommitter.java       |   7 +-
 .../fs/s3a/commit/staging/StagingCommitter.java    |   2 +-
 .../org/apache/hadoop/fs/s3a/commit/TestTasks.java |  21 +++-
 5 files changed, 142 insertions(+), 40 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index e82fbda..32d00a4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -472,7 +473,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
       Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
           .suppressExceptions(false)
-          .executeWith(buildThreadPool(context))
+          .executeWith(buildSubmitter(context))
           .abortWith(path ->
               loadAndAbort(commitContext, pending, path, true, false))
           .revertWith(path ->
@@ -502,7 +503,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
       Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
           .suppressExceptions(false)
-          .executeWith(buildThreadPool(context))
+          .executeWith(buildSubmitter(context))
           .run(path -> PendingSet.load(sourceFS, path));
     }
   }
@@ -525,7 +526,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
       Tasks.foreach(pendingSet.getCommits())
           .stopOnFailure()
           .suppressExceptions(false)
-          .executeWith(singleCommitThreadPool())
+          .executeWith(singleThreadSubmitter())
           .onFailure((commit, exception) ->
               commitContext.abortSingleCommit(commit))
           .abortWith(commitContext::abortSingleCommit)
@@ -580,7 +581,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
           path);
       FileSystem fs = getDestFS();
       Tasks.foreach(pendingSet.getCommits())
-          .executeWith(singleCommitThreadPool())
+          .executeWith(singleThreadSubmitter())
           .suppressExceptions(suppressExceptions)
           .run(commit -> {
             try {
@@ -674,7 +675,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
         return;
       }
       Tasks.foreach(pending)
-          .executeWith(buildThreadPool(getJobContext()))
+          .executeWith(buildSubmitter(getJobContext()))
           .suppressExceptions(suppressExceptions)
           .run(u -> commitContext.abortMultipartCommit(
               u.getKey(), u.getUploadId()));
@@ -838,44 +839,116 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
   }
 
   /**
-   * Returns an {@link ExecutorService} for parallel tasks. The number of
+   * Returns an {@link Tasks.Submitter} for parallel tasks. The number of
    * threads in the thread-pool is set by fs.s3a.committer.threads.
    * If num-threads is 0, this will return null;
+   * this is used in Tasks as a cue
+   * to switch to single-threaded execution.
    *
    * @param context the JobContext for this commit
-   * @return an {@link ExecutorService} or null for the number of threads
+   * @return a submitter or null
    */
-  protected final synchronized ExecutorService buildThreadPool(
+  protected Tasks.Submitter buildSubmitter(
       JobContext context) {
+    if (getThreadCount(context) > 0) {
+      return new PoolSubmitter(context);
+    } else {
+      return null;
+    }
+  }
 
+  /**
+   * Returns an {@link ExecutorService} for parallel tasks. The number of
+   * threads in the thread-pool is set by fs.s3a.committer.threads.
+   * If num-threads is 0, this will raise an exception.
+   *
+   * @param context the JobContext for this commit
+   * @param numThreads threads
+   * @return an {@link ExecutorService} for the number of threads
+   */
+  private synchronized ExecutorService buildThreadPool(
+      JobContext context, int numThreads) {
+    Preconditions.checkArgument(numThreads > 0,
+        "Cannot create a thread pool with no threads");
     if (threadPool == null) {
-      int numThreads = context.getConfiguration().getInt(
-          FS_S3A_COMMITTER_THREADS,
-          DEFAULT_COMMITTER_THREADS);
       LOG.debug("{}: creating thread pool of size {}", getRole(), numThreads);
-      if (numThreads > 0) {
-        threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
-            new ThreadFactoryBuilder()
-                .setDaemon(true)
-                .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
-                .build());
-      } else {
-        return null;
-      }
+      threadPool = HadoopExecutors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat(THREAD_PREFIX + context.getJobID() + "-%d")
+              .build());
     }
     return threadPool;
   }
 
   /**
+   * Get the thread count for this job's commit operations.
+   * @param context the JobContext for this commit
+   * @return a possibly zero thread count.
+   */
+  private int getThreadCount(final JobContext context) {
+    return context.getConfiguration().getInt(
+        FS_S3A_COMMITTER_THREADS,
+        DEFAULT_COMMITTER_THREADS);
+  }
+
+  /**
+   * Submit a runnable.
+   * This will demand-create the thread pool if needed.
+   * <p></p>
+   * This is synchronized to ensure the thread pool is always valid when
+   * work is synchronized. See HADOOP-16798.
+   * @param context the JobContext for this commit
+   * @param task task to execute
+   * @return the future of the submitted task.
+   */
+  private synchronized Future<?> submitRunnable(
+      final JobContext context,
+      final Runnable task) {
+    return buildThreadPool(context, getThreadCount(context)).submit(task);
+  }
+
+  /**
+   * The real task submitter, which hands off the work to
+   * the current thread pool.
+   */
+  private final class PoolSubmitter implements Tasks.Submitter {
+
+    private final JobContext context;
+
+    private final int numThreads;
+
+    private PoolSubmitter(final JobContext context) {
+      this.numThreads = getThreadCount(context);
+      Preconditions.checkArgument(numThreads > 0,
+          "Cannot create a thread pool with no threads");
+      this.context = context;
+    }
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+      return submitRunnable(context, task);
+    }
+
+  }
+
+  /**
    * Destroy any thread pools; wait for that to finish,
    * but don't overreact if it doesn't finish in time.
    */
-  protected synchronized void destroyThreadPool() {
-    if (threadPool != null) {
+  protected void destroyThreadPool() {
+    ExecutorService pool;
+    // reset the thread pool in a sync block, then shut it down
+    // afterwards. This allows for other threads to create a
+    // new thread pool on demand.
+    synchronized(this) {
+      pool = this.threadPool;
+      threadPool = null;
+    }
+    if (pool != null) {
       LOG.debug("Destroying thread pool");
-      HadoopExecutors.shutdown(threadPool, LOG,
+      HadoopExecutors.shutdown(pool, LOG,
           THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
-      threadPool = null;
     }
   }
 
@@ -884,11 +957,9 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
    * within the commit of all uploads of a single task.
    * This is currently null; it is here to allow the Tasks class to
    * provide the logic for execute/revert.
-   * Why not use the existing thread pool? Too much fear of deadlocking,
-   * and tasks are being committed in parallel anyway.
    * @return null. always.
    */
-  protected final synchronized ExecutorService singleCommitThreadPool() {
+  protected final synchronized Tasks.Submitter singleThreadSubmitter() {
     return null;
   }
 
@@ -932,7 +1003,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
            CommitOperations.CommitContext commitContext
                = initiateCommitOperation()) {
         Tasks.foreach(pending)
-            .executeWith(buildThreadPool(context))
+            .executeWith(buildSubmitter(context))
             .suppressExceptions(suppressExceptions)
             .run(commitContext::abortSingleCommit);
       }
@@ -961,7 +1032,7 @@ public abstract class AbstractS3ACommitter extends 
PathOutputCommitter {
            CommitOperations.CommitContext commitContext
                = initiateCommitOperation()) {
         Tasks.foreach(pending.getSourceFiles())
-            .executeWith(buildThreadPool(context))
+            .executeWith(buildSubmitter(context))
             .suppressExceptions(suppressExceptions)
             .run(path ->
                 loadAndAbort(commitContext,
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
index b6b6b97..c318e86 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Tasks.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -76,7 +75,7 @@ public final class Tasks {
    */
   public static class Builder<I> {
     private final Iterable<I> items;
-    private ExecutorService service = null;
+    private Submitter service = null;
     private FailureTask<I, ?> onFailure = null;
     private boolean stopOnFailure = false;
     private boolean suppressExceptions = false;
@@ -96,11 +95,11 @@ public final class Tasks {
     /**
      * Declare executor service: if null, the tasks are executed in a single
      * thread.
-     * @param executorService service to schedule tasks with.
+     * @param submitter service to schedule tasks with.
      * @return this builder.
      */
-    public Builder<I> executeWith(ExecutorService executorService) {
-      this.service = executorService;
+    public Builder<I> executeWith(Submitter submitter) {
+      this.service = submitter;
       return this;
     }
 
@@ -407,4 +406,18 @@ public final class Tasks {
     }
     throw (E) e;
   }
+
+  /**
+   * Interface to whatever lets us submit tasks.
+   */
+  public interface Submitter {
+
+    /**
+     * Submit work.
+     * @param task task to execute
+     * @return the future of the submitted task.
+     */
+    Future<?> submit(Runnable task);
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
index 20aca3c..7be5406 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitter.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -187,7 +186,7 @@ public class PartitionedStagingCommitter extends 
StagingCommitter {
 
     Map<Path, String> partitions = new ConcurrentHashMap<>();
     FileSystem sourceFS = pending.getSourceFS();
-    ExecutorService pool = buildThreadPool(context);
+    Tasks.Submitter submitter = buildSubmitter(context);
     try (DurationInfo ignored =
              new DurationInfo(LOG, "Replacing partitions")) {
 
@@ -198,7 +197,7 @@ public class PartitionedStagingCommitter extends 
StagingCommitter {
       Tasks.foreach(pending.getSourceFiles())
           .stopOnFailure()
           .suppressExceptions(false)
-          .executeWith(pool)
+          .executeWith(submitter)
           .run(path -> {
             PendingSet pendingSet = PendingSet.load(sourceFS, path);
             Path lastParent = null;
@@ -216,7 +215,7 @@ public class PartitionedStagingCommitter extends 
StagingCommitter {
     Tasks.foreach(partitions.keySet())
         .stopOnFailure()
         .suppressExceptions(false)
-        .executeWith(pool)
+        .executeWith(submitter)
         .run(partitionPath -> {
           LOG.debug("{}: removing partition path to be replaced: " +
               getRole(), partitionPath);
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 7eca1b4..91e68af 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -699,7 +699,7 @@ public class StagingCommitter extends AbstractS3ACommitter {
         Tasks.foreach(taskOutput)
             .stopOnFailure()
             .suppressExceptions(false)
-            .executeWith(buildThreadPool(context))
+            .executeWith(buildSubmitter(context))
             .run(stat -> {
               Path path = stat.getPath();
               File localFile = new File(path.toUri().getPath());
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
index 4ee39f1..4211c62 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestTasks.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -57,6 +58,12 @@ public class TestTasks extends HadoopTestBase {
    * Thread pool for task execution.
    */
   private ExecutorService threadPool;
+
+  /**
+   * Task submitter bonded to the thread pool, or
+   * null for the 0-thread case.
+   */
+  Tasks.Submitter submitter;
   private final CounterTask failingTask
       = new CounterTask("failing committer", FAILPOINT, Item::commit);
 
@@ -117,6 +124,9 @@ public class TestTasks extends HadoopTestBase {
               .setDaemon(true)
               .setNameFormat(getMethodName() + "-pool-%d")
               .build());
+      submitter = new PoolSubmitter();
+    } else {
+      submitter = null;
     }
 
   }
@@ -129,12 +139,21 @@ public class TestTasks extends HadoopTestBase {
     }
   }
 
+  private class PoolSubmitter implements Tasks.Submitter {
+
+    @Override
+    public Future<?> submit(final Runnable task) {
+      return threadPool.submit(task);
+    }
+
+  }
+
   /**
    * create the builder.
    * @return pre-inited builder
    */
   private Tasks.Builder<Item> builder() {
-    return Tasks.foreach(items).executeWith(threadPool);
+    return Tasks.foreach(items).executeWith(submitter);
   }
 
   private void assertRun(Tasks.Builder<Item> builder,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to