YARN-8990. Fix fair scheduler race condition in app submit and queue cleanup. 
(Contributed by Wilfred Spiegelenburg)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/524a7523
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/524a7523
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/524a7523

Branch: refs/heads/HDFS-13891
Commit: 524a7523c427b55273133078898ae3535897bada
Parents: 89b4916
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu Nov 8 16:02:48 2018 -0800
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Thu Nov 8 16:02:48 2018 -0800

----------------------------------------------------------------------
 .../scheduler/fair/FSLeafQueue.java             |  14 +++
 .../scheduler/fair/FairScheduler.java           |  19 +++-
 .../scheduler/fair/QueueManager.java            | 113 +++++++++++++------
 3 files changed, 104 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/524a7523/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 7e4dab8..a038887 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -651,4 +651,18 @@ public class FSLeafQueue extends FSQueue {
       writeLock.unlock();
     }
   }
+
+  /**
+   * This method is called when an application is removed from this queue
+   * during the submit process.
+   * @param applicationId the application's id
+   */
+  public void removeAssignedApp(ApplicationId applicationId) {
+    writeLock.lock();
+    try {
+      assignedApps.remove(applicationId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/524a7523/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index da5e4c9..e5d2a06 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -473,7 +473,7 @@ public class FairScheduler extends
     writeLock.lock();
     try {
       RMApp rmApp = rmContext.getRMApps().get(applicationId);
-      FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+      FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
       if (queue == null) {
         return;
       }
@@ -499,6 +499,7 @@ public class FairScheduler extends
                   applicationId, queue.getName(),
                   invalidAMResourceRequests, queue.getMaxShare());
           rejectApplicationWithMessage(applicationId, msg);
+          queue.removeAssignedApp(applicationId);
           return;
         }
       }
@@ -513,6 +514,7 @@ public class FairScheduler extends
             + " cannot submit applications to queue " + queue.getName()
             + "(requested queuename is " + queueName + ")";
         rejectApplicationWithMessage(applicationId, msg);
+        queue.removeAssignedApp(applicationId);
         return;
       }
 
@@ -520,7 +522,6 @@ public class FairScheduler extends
           new SchedulerApplication<FSAppAttempt>(queue, user);
       applications.put(applicationId, application);
       queue.getMetrics().submitApp(user);
-      queue.addAssignedApp(applicationId);
 
       LOG.info("Accepted application " + applicationId + " from user: " + user
           + ", in queue: " + queue.getName()
@@ -597,11 +598,19 @@ public class FairScheduler extends
   }
 
   /**
-   * Helper method that attempts to assign the app to a queue. The method is
-   * responsible to call the appropriate event-handler if the app is rejected.
+   * Helper method for the tests to assign the app to a queue.
    */
   @VisibleForTesting
   FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
+    return assignToQueue(rmApp, queueName, user, null);
+  }
+
+  /**
+   * Helper method that attempts to assign the app to a queue. The method is
+   * responsible to call the appropriate event-handler if the app is rejected.
+   */
+  private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
+        ApplicationId applicationId) {
     FSLeafQueue queue = null;
     String appRejectMsg = null;
 
@@ -611,7 +620,7 @@ public class FairScheduler extends
       if (queueName == null) {
         appRejectMsg = "Application rejected by queue placement policy";
       } else {
-        queue = queueMgr.getLeafQueue(queueName, true);
+        queue = queueMgr.getLeafQueue(queueName, true, applicationId);
         if (queue == null) {
           appRejectMsg = queueName + " is not a leaf queue";
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/524a7523/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 8371765..2ca32c3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.xml.sax.SAXException;
@@ -71,7 +72,7 @@ public class QueueManager {
       Boolean removed =
           removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
       if (Boolean.TRUE.equals(removed)) {
-        FSQueue queue = getQueue(queueToCreate, true, queueType, false);
+        FSQueue queue = getQueue(queueToCreate, true, queueType, false, null);
         if (queue != null &&
             // if queueToCreate is present in the allocation config, set it
             // to static
@@ -124,30 +125,49 @@ public class QueueManager {
 
   /**
    * Get a leaf queue by name, creating it if the create param is
-   * true and is necessary.
-   * If the queue is not or can not be a leaf queue, i.e. it already exists as 
a
-   * parent queue, or one of the parents in its name is already a leaf queue,
-   * null is returned.
+   * <code>true</code> and the queue does not exist.
+   * If the queue is not or can not be a leaf queue, i.e. it already exists as
+   * a parent queue, or one of the parents in its name is already a leaf queue,
+   * <code>null</code> is returned.
    * 
    * The root part of the name is optional, so a queue underneath the root 
    * named "queue1" could be referred to  as just "queue1", and a queue named
    * "queue2" underneath a parent named "parent1" that is underneath the root 
    * could be referred to as just "parent1.queue2".
+   * @param name name of the queue
+   * @param create <code>true</code> if the queue must be created if it does
+   *               not exist, <code>false</code> otherwise
+   * @return the leaf queue or <code>null</code> if the queue cannot be found
    */
   public FSLeafQueue getLeafQueue(String name, boolean create) {
-    return getLeafQueue(name, create, true);
+    return getLeafQueue(name, create, null, true);
   }
 
-  private FSLeafQueue getLeafQueue(
-      String name,
-      boolean create,
-      boolean recomputeSteadyShares) {
-    FSQueue queue = getQueue(
-        name,
-        create,
-        FSQueueType.LEAF,
-        recomputeSteadyShares
-    );
+  /**
+   * Get a leaf queue by name, creating it if the create param is
+   * <code>true</code> and the queue does not exist.
+   * If the queue is not or can not be a leaf queue, i.e. it already exists as
+   * a parent queue, or one of the parents in its name is already a leaf queue,
+   * <code>null</code> is returned.
+   *
+   * If the application will be assigned to the queue if the applicationId is
+   * not <code>null</code>
+   * @param name name of the queue
+   * @param create <code>true</code> if the queue must be created if it does
+   *               not exist, <code>false</code> otherwise
+   * @param applicationId the application ID to assign to the queue
+   * @return the leaf queue or <code>null</code> if teh queue cannot be found
+   */
+  public FSLeafQueue getLeafQueue(String name, boolean create,
+                                  ApplicationId applicationId) {
+    return getLeafQueue(name, create, applicationId, true);
+  }
+
+  private FSLeafQueue getLeafQueue(String name, boolean create,
+                                   ApplicationId applicationId,
+                                   boolean recomputeSteadyShares) {
+    FSQueue queue = getQueue(name, create, FSQueueType.LEAF,
+        recomputeSteadyShares, applicationId);
     if (queue instanceof FSParentQueue) {
       return null;
     }
@@ -168,42 +188,55 @@ public class QueueManager {
 
   /**
    * Get a parent queue by name, creating it if the create param is
-   * true and is necessary.
-   * If the queue is not or can not be a parent queue,
-   * i.e. it already exists as a
-   * leaf queue, or one of the parents in its name is already a leaf queue,
-   * null is returned.
+   * <code>true</code> and the queue does not exist.
+   * If the queue is not or can not be a parent queue, i.e. it already exists
+   * as a leaf queue, or one of the parents in its name is already a leaf
+   * queue, <code>null</code> is returned.
    * 
    * The root part of the name is optional, so a queue underneath the root 
    * named "queue1" could be referred to  as just "queue1", and a queue named
    * "queue2" underneath a parent named "parent1" that is underneath the root 
    * could be referred to as just "parent1.queue2".
+   * @param name name of the queue
+   * @param create <code>true</code> if the queue must be created if it does
+   *               not exist, <code>false</code> otherwise
+   * @return the parent queue or <code>null</code> if the queue cannot be found
    */
   public FSParentQueue getParentQueue(String name, boolean create) {
     return getParentQueue(name, create, true);
   }
 
-  public FSParentQueue getParentQueue(
-      String name,
-      boolean create,
+  /**
+   * Get a parent queue by name, creating it if the create param is
+   * <code>true</code> and the queue does not exist.
+   * If the queue is not or can not be a parent queue, i.e. it already exists
+   * as a leaf queue, or one of the parents in its name is already a leaf
+   * queue, <code>null</code> is returned.
+   *
+   * The root part of the name is optional, so a queue underneath the root
+   * named "queue1" could be referred to  as just "queue1", and a queue named
+   * "queue2" underneath a parent named "parent1" that is underneath the root
+   * could be referred to as just "parent1.queue2".
+   * @param name name of the queue
+   * @param create <code>true</code> if the queue must be created if it does
+   *               not exist, <code>false</code> otherwise
+   * @param recomputeSteadyShares <code>true</code> if the steady fair share
+   *                              should be recalculated when a queue is added,
+   *                              <code>false</code> otherwise
+   * @return the parent queue or <code>null</code> if the queue cannot be found
+   */
+  public FSParentQueue getParentQueue(String name, boolean create,
       boolean recomputeSteadyShares) {
-    FSQueue queue = getQueue(
-        name,
-        create,
-        FSQueueType.PARENT,
-        recomputeSteadyShares
-    );
+    FSQueue queue = getQueue(name, create, FSQueueType.PARENT,
+        recomputeSteadyShares, null);
     if (queue instanceof FSLeafQueue) {
       return null;
     }
     return (FSParentQueue) queue;
   }
 
-  private FSQueue getQueue(
-      String name,
-      boolean create,
-      FSQueueType queueType,
-      boolean recomputeSteadyShares) {
+  private FSQueue getQueue(String name, boolean create, FSQueueType queueType,
+      boolean recomputeSteadyShares, ApplicationId applicationId) {
     boolean recompute = recomputeSteadyShares;
     name = ensureRootPrefix(name);
     FSQueue queue;
@@ -215,8 +248,14 @@ public class QueueManager {
       } else {
         recompute = false;
       }
+      // At this point the queue exists and we need to assign the app if to the
+      // but only to a leaf queue
+      if (applicationId != null && queue instanceof FSLeafQueue) {
+        ((FSLeafQueue)queue).addAssignedApp(applicationId);
+      }
     }
-    if (recompute) {
+    // Don't recompute if it is an existing queue or no change was made
+    if (recompute && queue != null) {
       rootQueue.recomputeSteadyShares();
     }
     return queue;
@@ -614,7 +653,7 @@ public class QueueManager {
         incompatibleQueuesPendingRemoval.add(
             new IncompatibleQueueRemovalTask(name, queueType));
       } else {
-        FSQueue queue = getQueue(name, true, queueType, false);
+        FSQueue queue = getQueue(name, true, queueType, false, null);
         if (queue != null) {
           queue.setDynamic(false);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to