YARN-8191. Fair scheduler: queue deletion without RM restart. (Gergo Repas via 
Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86bc6425
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86bc6425
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86bc6425

Branch: refs/heads/HDDS-48
Commit: 86bc6425d425913899f1d951498bd040e453b3d0
Parents: d9852eb
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu May 24 17:07:21 2018 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Thu May 24 17:12:34 2018 -0700

----------------------------------------------------------------------
 .../fair/AllocationFileLoaderService.java       |  16 +-
 .../scheduler/fair/FSLeafQueue.java             |  31 ++
 .../resourcemanager/scheduler/fair/FSQueue.java |   9 +
 .../scheduler/fair/FairScheduler.java           |  29 +-
 .../scheduler/fair/QueueManager.java            | 155 +++++++--
 .../fair/TestAllocationFileLoaderService.java   | 100 +++---
 .../scheduler/fair/TestQueueManager.java        | 337 +++++++++++++++++++
 7 files changed, 596 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index d8d9051..7a40b6a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -87,7 +87,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
   private Path allocFile;
   private FileSystem fs;
 
-  private Listener reloadListener;
+  private final Listener reloadListener;
 
   @VisibleForTesting
   long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
@@ -95,15 +95,16 @@ public class AllocationFileLoaderService extends 
AbstractService {
   private Thread reloadThread;
   private volatile boolean running = true;
 
-  public AllocationFileLoaderService() {
-    this(SystemClock.getInstance());
+  public AllocationFileLoaderService(Listener reloadListener) {
+    this(reloadListener, SystemClock.getInstance());
   }
 
   private List<Permission> defaultPermissions;
 
-  public AllocationFileLoaderService(Clock clock) {
+  public AllocationFileLoaderService(Listener reloadListener, Clock clock) {
     super(AllocationFileLoaderService.class.getName());
     this.clock = clock;
+    this.reloadListener = reloadListener;
   }
 
   @Override
@@ -114,6 +115,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
       reloadThread = new Thread(() -> {
         while (running) {
           try {
+            reloadListener.onCheck();
             long time = clock.getTime();
             long lastModified =
                 fs.getFileStatus(allocFile).getModificationTime();
@@ -207,10 +209,6 @@ public class AllocationFileLoaderService extends 
AbstractService {
     return allocPath;
   }
 
-  public synchronized void setReloadListener(Listener reloadListener) {
-    this.reloadListener = reloadListener;
-  }
-
   /**
    * Updates the allocation list from the allocation config file. This file is
    * expected to be in the XML format specified in the design doc.
@@ -351,5 +349,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
 
   public interface Listener {
     void onReload(AllocationConfiguration info) throws IOException;
+
+    void onCheck();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 49d2166..e7da16f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -21,7 +21,9 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,6 +36,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -56,6 +59,8 @@ public class FSLeafQueue extends FSQueue {
   // apps that are runnable
   private final List<FSAppAttempt> runnableApps = new ArrayList<>();
   private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
+  // assignedApps keeps track of applications that have no appAttempts
+  private final Set<ApplicationId> assignedApps = new HashSet<>();
   // get a lock with fair distribution for app list updates
   private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
   private final Lock readLock = rwl.readLock();
@@ -89,6 +94,9 @@ public class FSLeafQueue extends FSQueue {
       } else {
         nonRunnableApps.add(app);
       }
+      // when an appAttempt is created for an application, we'd like to move
+      // it over from assignedApps to either runnableApps or nonRunnableApps
+      assignedApps.remove(app.getApplicationId());
       incUsedResource(app.getResourceUsage());
     } finally {
       writeLock.unlock();
@@ -440,6 +448,15 @@ public class FSLeafQueue extends FSQueue {
     return numPendingApps;
   }
 
+  public int getNumAssignedApps() {
+    readLock.lock();
+    try {
+      return assignedApps.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   /**
    * TODO: Based on how frequently this is called, we might want to club
    * counting pending and active apps in the same method.
@@ -609,4 +626,18 @@ public class FSLeafQueue extends FSQueue {
         ", LastTimeAtMinShare: " + lastTimeAtMinShare +
         "}");
   }
+
+  /**
+   * This method is called when an application is assigned to this queue
+   * for book-keeping purposes (to be able to determine if the queue is empty).
+   * @param applicationId the application's id
+   */
+  public void addAssignedApp(ApplicationId applicationId) {
+    writeLock.lock();
+    try {
+      assignedApps.add(applicationId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 4babfd5..6b88a32 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -83,6 +83,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private long minSharePreemptionTimeout = Long.MAX_VALUE;
   private float fairSharePreemptionThreshold = 0.5f;
   private boolean preemptable = true;
+  private boolean isDynamic = true;
 
   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -585,4 +586,12 @@ public abstract class FSQueue implements Queue, 
Schedulable {
    * @param sb the {code StringBuilder} which holds queue states
    */
   protected abstract void dumpStateInternal(StringBuilder sb);
+
+  public boolean isDynamic() {
+    return isDynamic;
+  }
+
+  public void setDynamic(boolean dynamic) {
+    this.isDynamic = dynamic;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1c4bd51..4c84aa9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -207,7 +208,8 @@ public class FairScheduler extends
   public FairScheduler() {
     super(FairScheduler.class.getName());
     context = new FSContext(this);
-    allocsLoader = new AllocationFileLoaderService();
+    allocsLoader =
+        new AllocationFileLoaderService(new AllocationReloadListener());
     queueMgr = new QueueManager(this);
     maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
   }
@@ -516,6 +518,7 @@ public class FairScheduler extends
           new SchedulerApplication<FSAppAttempt>(queue, user);
       applications.put(applicationId, application);
       queue.getMetrics().submitApp(user);
+      queue.addAssignedApp(applicationId);
 
       LOG.info("Accepted application " + applicationId + " from user: " + user
           + ", in queue: " + queue.getName()
@@ -1435,7 +1438,6 @@ public class FairScheduler extends
     }
 
     allocsLoader.init(conf);
-    allocsLoader.setReloadListener(new AllocationReloadListener());
     // If we fail to load allocations file on initialize, we want to fail
     // immediately.  After a successful load, exceptions on future reloads
     // will just result in leaving things as they are.
@@ -1589,6 +1591,7 @@ public class FairScheduler extends
       // Commit the reload; also create any queue defined in the alloc file
       // if it does not already exist, so it can be displayed on the web UI.
 
+      Set<String> removedStaticQueues = getRemovedStaticQueues(queueInfo);
       writeLock.lock();
       try {
         if (queueInfo == null) {
@@ -1599,6 +1602,7 @@ public class FairScheduler extends
           setQueueAcls(allocConf.getQueueAcls());
           allocConf.getDefaultSchedulingPolicy().initialize(getContext());
           queueMgr.updateAllocationConfiguration(allocConf);
+          queueMgr.setQueuesToDynamic(removedStaticQueues);
           applyChildDefaults();
           maxRunningEnforcer.updateRunnabilityOnReload();
         }
@@ -1606,6 +1610,27 @@ public class FairScheduler extends
         writeLock.unlock();
       }
     }
+
+    private Set<String> getRemovedStaticQueues(
+        AllocationConfiguration queueInfo) {
+      if (queueInfo == null || allocConf == null) {
+        return Collections.emptySet();
+      }
+      Set<String> removedStaticQueues = new HashSet<>();
+      for (Set<String> queues : allocConf.getConfiguredQueues().values()) {
+        removedStaticQueues.addAll(queues);
+      }
+      for (Set<String> queues : queueInfo.getConfiguredQueues().values()) {
+        removedStaticQueues.removeAll(queues);
+      }
+      return removedStaticQueues;
+    }
+
+    @Override
+    public void onCheck() {
+      queueMgr.removeEmptyDynamicQueues();
+      queueMgr.removePendingIncompatibleQueues();
+    }
   }
 
   private void setQueueAcls(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
index 8734877..632a842 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
@@ -22,13 +22,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.xml.parsers.ParserConfigurationException;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,6 +56,36 @@ public class QueueManager {
   public static final Log LOG = LogFactory.getLog(
     QueueManager.class.getName());
 
+  private final class IncompatibleQueueRemovalTask {
+
+    private final String queueToCreate;
+    private final FSQueueType queueType;
+
+    private IncompatibleQueueRemovalTask(String queueToCreate,
+        FSQueueType queueType) {
+      this.queueToCreate = queueToCreate;
+      this.queueType = queueType;
+    }
+
+    private void execute() {
+      Boolean removed =
+          removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
+      if (Boolean.TRUE.equals(removed)) {
+        FSQueue queue = getQueue(queueToCreate, true, queueType, false);
+        if (queue != null &&
+            // if queueToCreate is present in the allocation config, set it
+            // to static
+            scheduler.allocConf.configuredQueues.values().stream()
+            .anyMatch(s -> s.contains(queueToCreate))) {
+          queue.setDynamic(false);
+        }
+      }
+      if (!Boolean.FALSE.equals(removed)) {
+        incompatibleQueuesPendingRemoval.remove(this);
+      }
+    }
+  }
+
   public static final String ROOT_QUEUE = "root";
   
   private final FairScheduler scheduler;
@@ -59,6 +93,8 @@ public class QueueManager {
   private final Collection<FSLeafQueue> leafQueues = 
       new CopyOnWriteArrayList<FSLeafQueue>();
   private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+  private Set<IncompatibleQueueRemovalTask> incompatibleQueuesPendingRemoval =
+      new HashSet<>();
   private FSParentQueue rootQueue;
 
   public QueueManager(FairScheduler scheduler) {
@@ -75,10 +111,13 @@ public class QueueManager {
     // SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
     // loaded yet.
     rootQueue = new FSParentQueue("root", scheduler, null);
+    rootQueue.setDynamic(false);
     queues.put(rootQueue.getName(), rootQueue);
 
     // Create the default queue
-    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
+    FSLeafQueue defaultQueue =
+        getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
+    defaultQueue.setDynamic(false);
     // Recursively reinitialize to propagate queue properties
     rootQueue.reinit(true);
   }
@@ -121,7 +160,8 @@ public class QueueManager {
    */
   public boolean removeLeafQueue(String name) {
     name = ensureRootPrefix(name);
-    return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
+    return !Boolean.FALSE.equals(
+        removeEmptyIncompatibleQueues(name, FSQueueType.PARENT).orElse(null));
   }
 
 
@@ -346,9 +386,13 @@ public class QueueManager {
    * 
    * We will never remove the root queue or the default queue in this way.
    *
-   * @return true if we can create queueToCreate or it already exists.
+   * @return Optional.of(Boolean.TRUE)  if there was an incompatible queue that
+   *                                    has been removed,
+   *         Optional.of(Boolean.FALSE) if there was an incompatible queue that
+   *                                    have not be removed,
+   *         Optional.empty()           if there is no incompatible queue.
    */
-  private boolean removeEmptyIncompatibleQueues(String queueToCreate,
+  private Optional<Boolean> removeEmptyIncompatibleQueues(String queueToCreate,
       FSQueueType queueType) {
     queueToCreate = ensureRootPrefix(queueToCreate);
 
@@ -357,7 +401,7 @@ public class QueueManager {
     if (queueToCreate.equals(ROOT_QUEUE) ||
         queueToCreate.startsWith(
             ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
-      return false;
+      return Optional.empty();
     }
 
     FSQueue queue = queues.get(queueToCreate);
@@ -365,19 +409,18 @@ public class QueueManager {
     if (queue != null) {
       if (queue instanceof FSLeafQueue) {
         if (queueType == FSQueueType.LEAF) {
-          // if queue is already a leaf then return true
-          return true;
+          return Optional.empty();
         }
         // remove incompatibility since queue is a leaf currently
         // needs to change to a parent.
-        return removeQueueIfEmpty(queue);
+        return Optional.of(removeQueueIfEmpty(queue));
       } else {
         if (queueType == FSQueueType.PARENT) {
-          return true;
+          return Optional.empty();
         }
         // If it's an existing parent queue and needs to change to leaf, 
         // remove it if it's empty.
-        return removeQueueIfEmpty(queue);
+        return Optional.of(removeQueueIfEmpty(queue));
       }
     }
 
@@ -389,11 +432,51 @@ public class QueueManager {
       String prefixString = queueToCreate.substring(0, sepIndex);
       FSQueue prefixQueue = queues.get(prefixString);
       if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
-        return removeQueueIfEmpty(prefixQueue);
+        return Optional.of(removeQueueIfEmpty(prefixQueue));
       }
       sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
     }
-    return true;
+    return Optional.empty();
+  }
+
+  /**
+   * Removes all empty dynamic queues (including empty dynamic parent queues).
+   */
+  public void removeEmptyDynamicQueues() {
+    synchronized (queues) {
+      Set<FSParentQueue> parentQueuesToCheck = new HashSet<>();
+      for (FSQueue queue : getQueues()) {
+        if (queue.isDynamic() && queue.getChildQueues().isEmpty()) {
+          boolean removed = removeQueueIfEmpty(queue);
+          if (removed && queue.getParent().isDynamic()) {
+            parentQueuesToCheck.add(queue.getParent());
+          }
+        }
+      }
+      while (!parentQueuesToCheck.isEmpty()) {
+        FSParentQueue queue = parentQueuesToCheck.iterator().next();
+        if (queue.getChildQueues().isEmpty()) {
+          removeQueue(queue);
+          if (queue.getParent().isDynamic()) {
+            parentQueuesToCheck.add(queue.getParent());
+          }
+        }
+        parentQueuesToCheck.remove(queue);
+      }
+    }
+  }
+
+  /**
+   * Re-checking incompatible queues that could not be removed earlier due to
+   * not being empty, and removing those that became empty.
+   */
+  public void removePendingIncompatibleQueues() {
+    synchronized (queues) {
+      for (IncompatibleQueueRemovalTask removalTask :
+          ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) {
+        removalTask.execute();
+      }
+    }
   }
 
   /**
@@ -435,7 +518,8 @@ public class QueueManager {
     if (queue instanceof FSLeafQueue) {
       FSLeafQueue leafQueue = (FSLeafQueue)queue;
       return queue.getNumRunnableApps() == 0 &&
-          leafQueue.getNumNonRunnableApps() == 0;
+          leafQueue.getNumNonRunnableApps() == 0 &&
+          leafQueue.getNumAssignedApps() == 0;
     } else {
       for (FSQueue child : queue.getChildQueues()) {
         if (!isEmpty(child)) {
@@ -501,21 +585,13 @@ public class QueueManager {
         LOG.error("Setting scheduling policies for existing queues failed!");
       }
 
-      for (String name : queueConf.getConfiguredQueues().get(
-              FSQueueType.LEAF)) {
-        if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
-          getLeafQueue(name, true, false);
-        }
-      }
+      ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF);
+
       // At this point all leaves and 'parents with
       // at least one child' would have been created.
       // Now create parents with no configured leaf.
-      for (String name : queueConf.getConfiguredQueues().get(
-          FSQueueType.PARENT)) {
-        if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
-          getParentQueue(name, true, false);
-        }
-      }
+      ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf,
+          FSQueueType.PARENT);
     }
 
     // Initialize all queues recursively
@@ -524,6 +600,35 @@ public class QueueManager {
     rootQueue.recomputeSteadyShares();
   }
 
+  private void ensureQueueExistsAndIsCompatibleAndIsStatic(
+      AllocationConfiguration queueConf, FSQueueType queueType) {
+    for (String name : queueConf.getConfiguredQueues().get(queueType)) {
+      Boolean removed =
+          removeEmptyIncompatibleQueues(name, queueType).orElse(null);
+      if (Boolean.FALSE.equals(removed)) {
+        incompatibleQueuesPendingRemoval.add(
+            new IncompatibleQueueRemovalTask(name, queueType));
+      } else {
+        FSQueue queue = getQueue(name, true, queueType, false);
+        if (queue != null) {
+          queue.setDynamic(false);
+        }
+      }
+    }
+  }
+
+  /**
+   * Setting a set of queues to dynamic.
+   * @param queueNames The names of the queues to be set to dynamic
+   */
+  protected void setQueuesToDynamic(Set<String> queueNames) {
+    synchronized (queues) {
+      for (String queueName : queueNames) {
+        queues.get(queueName).setDynamic(true);
+      }
+    }
+  }
+
   /**
    * Check whether queue name is valid,
    * return true if it is valid, otherwise return false.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
index 8591d67..30b8a91 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
@@ -32,6 +33,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Fai
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Test;
+import org.mockito.Mockito;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
@@ -79,7 +82,8 @@ public class TestAllocationFileLoaderService {
     fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(Mockito.mock(Listener.class));
     Path allocationFile = allocLoader.getAllocationFile(conf);
     assertEquals(fsAllocPath, allocationFile.toString());
     assertTrue(fs.exists(allocationFile));
@@ -92,7 +96,8 @@ public class TestAllocationFileLoaderService {
       throws UnsupportedFileSystemException {
     Configuration conf = new YarnConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(Mockito.mock(Listener.class));
 
     allocLoader.getAllocationFile(conf);
   }
@@ -105,7 +110,7 @@ public class TestAllocationFileLoaderService {
       conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
           TEST_FAIRSCHED_XML);
       AllocationFileLoaderService allocLoader =
-          new AllocationFileLoaderService();
+          new AllocationFileLoaderService(Mockito.mock(Listener.class));
       Path allocationFile = allocLoader.getAllocationFile(conf);
       assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
       assertTrue(fs.exists(allocationFile));
@@ -134,12 +139,11 @@ public class TestAllocationFileLoaderService {
     Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 
-    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
-        clock);
+    ReloadListener confHolder = new ReloadListener();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder, clock);
     allocLoader.reloadIntervalMs = 5;
     allocLoader.init(conf);
-    ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
     allocLoader.reloadAllocations();
     AllocationConfiguration allocConf = confHolder.allocConf;
 
@@ -205,7 +209,9 @@ public class TestAllocationFileLoaderService {
   public void testAllocationFileParsing() throws Exception {
     Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
+    ReloadListener confHolder = new ReloadListener();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
 
     AllocationFileWriter
             .create()
@@ -278,8 +284,6 @@ public class TestAllocationFileLoaderService {
             .writeToFile(ALLOC_FILE);
 
     allocLoader.init(conf);
-    ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
     allocLoader.reloadAllocations();
     AllocationConfiguration queueConf = confHolder.allocConf;
 
@@ -427,7 +431,9 @@ public class TestAllocationFileLoaderService {
   public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
     Configuration conf = new Configuration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
+    ReloadListener confHolder = new ReloadListener();
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -473,8 +479,6 @@ public class TestAllocationFileLoaderService {
     out.close();
 
     allocLoader.init(conf);
-    ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
     allocLoader.reloadAllocations();
     AllocationConfiguration queueConf = confHolder.allocConf;
 
@@ -550,10 +554,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
     AllocationConfiguration allocConf = confHolder.allocConf;
 
@@ -584,10 +588,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -608,10 +612,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -632,10 +636,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -654,10 +658,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     try {
       allocLoader.reloadAllocations();
     } catch (AllocationConfigurationException ex) {
@@ -685,10 +689,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     try {
       allocLoader.reloadAllocations();
     } catch (AllocationConfigurationException ex) {
@@ -714,10 +718,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
     AllocationConfiguration queueConf = confHolder.allocConf;
     // Check whether queue 'parent' and 'child' are loaded successfully
@@ -745,10 +749,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -767,10 +771,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -793,10 +797,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
 
     AllocationConfiguration allocConf = confHolder.allocConf;
@@ -853,10 +857,10 @@ public class TestAllocationFileLoaderService {
     out.println("</allocations>");
     out.close();
 
-    AllocationFileLoaderService allocLoader = new 
AllocationFileLoaderService();
-    allocLoader.init(conf);
     ReloadListener confHolder = new ReloadListener();
-    allocLoader.setReloadListener(confHolder);
+    AllocationFileLoaderService allocLoader =
+        new AllocationFileLoaderService(confHolder);
+    allocLoader.init(conf);
     allocLoader.reloadAllocations();
   }
 
@@ -867,5 +871,9 @@ public class TestAllocationFileLoaderService {
     public void onReload(AllocationConfiguration info) {
       allocConf = info;
     }
+
+    @Override
+    public void onCheck() {
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86bc6425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
index eb2d402..3674ffb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
@@ -20,15 +20,22 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 public class TestQueueManager {
@@ -305,4 +312,334 @@ public class TestQueueManager {
     assertEquals("createQueue() returned wrong queue",
         "root.queue1.queue2", q2.getName());
   }
+
+  @Test
+  public void testRemovalOfDynamicLeafQueue() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true);
+
+    assertNotNull("Queue root.test.childB.dynamic1 was not created", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.test.childB.dynamic1", q1.getName());
+    assertTrue("root.test.childB.dynamic1 is not a dynamic queue",
+        q1.isDynamic());
+
+    // an application is submitted to root.test.childB.dynamic1
+    notEmptyQueues.add(q1);
+
+    // root.test.childB.dynamic1 is not empty and should not be removed
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
+    assertNotNull("Queue root.test.childB.dynamic1 was deleted", q1);
+
+    // the application finishes, the next removeEmptyDynamicQueues() should
+    // clean root.test.childB.dynamic1 up, but keep its static parent
+    notEmptyQueues.remove(q1);
+
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
+    assertNull("Queue root.test.childB.dynamic1 was not deleted", q1);
+    assertNotNull("The static parent of root.test.childB.dynamic1 was deleted",
+        queueManager.getParentQueue("root.test.childB", false));
+  }
+
+  @Test
+  public void testRemovalOfDynamicParentQueue() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true);
+
+    assertNotNull("Queue root.parent1.dynamic1 was not created", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.parent1.dynamic1", q1.getName());
+    assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic());
+
+    FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
+    assertNotNull("Queue root.parent1 was not created", p1);
+    assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false);
+    p1 = queueManager.getParentQueue("root.parent1", false);
+
+    assertNull("Queue root.parent1.dynamic1 was not deleted", q1);
+    assertNull("Queue root.parent1 was not deleted", p1);
+  }
+
+  @Test
+  public void testNonEmptyDynamicQueueBecomingStaticQueue() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true);
+
+    assertNotNull("Queue root.leaf1 was not created", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.leaf1", q1.getName());
+    assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic());
+
+    // pretend that we submitted an app to the queue
+    notEmptyQueues.add(q1);
+
+    // non-empty queues should not be deleted
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.leaf1", false);
+    assertNotNull("Queue root.leaf1 was deleted", q1);
+
+    // next we add leaf1 under root in the allocation config
+    allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1");
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    // updateAllocationConfiguration() should make root.leaf1 a dynamic queue
+    assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
+
+    // application finished now and the queue is empty, but since leaf1 is a
+    // static queue at this point, hence not affected by
+    // removeEmptyDynamicQueues()
+    notEmptyQueues.clear();
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.leaf1", false);
+    assertNotNull("Queue root.leaf1 was deleted", q1);
+    assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
+  }
+
+  @Test
+  public void testNonEmptyStaticQueueBecomingDynamicQueue() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false);
+
+    assertNotNull("Queue root.test.childA does not exist", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.test.childA", q1.getName());
+    assertFalse("root.test.childA is not a static queue", q1.isDynamic());
+
+    // we submitted an app to the queue
+    notEmptyQueues.add(q1);
+
+    // the next removeEmptyDynamicQueues() call should not modify
+    // root.test.childA
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getLeafQueue("root.test.childA", false);
+    assertNotNull("Queue root.test.childA was deleted", q1);
+    assertFalse("root.test.childA is not a dynamic queue", q1.isDynamic());
+
+    // next we remove all queues from the allocation config,
+    // this causes all queues to change to dynamic
+    for (Set<String> queueNames : allocConf.configuredQueues.values()) {
+      queueManager.setQueuesToDynamic(queueNames);
+      queueNames.clear();
+    }
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    q1 = queueManager.getLeafQueue("root.test.childA", false);
+    assertNotNull("Queue root.test.childA was deleted", q1);
+    assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic());
+
+    // application finished - the queue does not have runnable app
+    // the next removeEmptyDynamicQueues() call should remove the queues
+    notEmptyQueues.remove(q1);
+
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+
+    q1 = queueManager.getLeafQueue("root.test.childA", false);
+    assertNull("Queue root.test.childA was not deleted", q1);
+
+    FSParentQueue p1 = queueManager.getParentQueue("root.test", false);
+    assertNull("Queue root.test was not deleted", p1);
+  }
+
+  @Test
+  public void testRemovalOfChildlessParentQueue() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false);
+
+    assertNotNull("Queue root.test.childB was not created", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.test.childB", q1.getName());
+    assertFalse("root.test.childB is a dynamic queue", q1.isDynamic());
+
+    // static queues should not be deleted
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getParentQueue("root.test.childB", false);
+    assertNotNull("Queue root.test.childB was deleted", q1);
+
+    // next we remove root.test.childB from the allocation config
+    allocConf.configuredQueues.get(FSQueueType.PARENT)
+        .remove("root.test.childB");
+    queueManager.updateAllocationConfiguration(allocConf);
+    queueManager.setQueuesToDynamic(Collections.singleton("root.test.childB"));
+
+    // the next removeEmptyDynamicQueues() call should clean
+    // root.test.childB up
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q1 = queueManager.getParentQueue("root.leaf1", false);
+    assertNull("Queue root.leaf1 was not deleted", q1);
+  }
+
+  @Test
+  public void testQueueTypeChange() {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true);
+    assertNotNull("Queue root.parent1.leaf1 was not created", q1);
+    assertEquals("createQueue() returned wrong queue",
+        "root.parent1.leaf1", q1.getName());
+    assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
+
+    FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
+    assertNotNull("Queue root.parent1 was not created", p1);
+    assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+
+    // adding root.parent1.leaf1 and root.parent1 to the allocation config
+    allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
+    allocConf.configuredQueues.get(FSQueueType.LEAF)
+        .add("root.parent1.leaf1");
+
+    // updateAllocationConfiguration() should change both queues over to static
+    queueManager.updateAllocationConfiguration(allocConf);
+    q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
+    assertFalse("root.parent1.leaf1 is not a static queue", q1.isDynamic());
+    p1 = queueManager.getParentQueue("root.parent1", false);
+    assertFalse("root.parent1 is not a static queue", p1.isDynamic());
+
+    // removing root.parent1.leaf1 and root.parent1 from the allocation
+    // config
+    allocConf.configuredQueues.get(FSQueueType.PARENT).remove("root.parent1");
+    allocConf.configuredQueues.get(FSQueueType.LEAF)
+        .remove("root.parent1.leaf1");
+
+    // updateAllocationConfiguration() should change both queues
+    // to dynamic
+    queueManager.updateAllocationConfiguration(allocConf);
+    queueManager.setQueuesToDynamic(
+        ImmutableSet.of("root.parent1", "root.parent1.leaf1"));
+    q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
+    assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
+    p1 = queueManager.getParentQueue("root.parent1", false);
+    assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
+  }
+
+  @Test
+  public void testApplicationAssignmentPreventsRemovalOfDynamicQueue()
+      throws Exception {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    queueManager = new QueueManager(scheduler);
+    queueManager.initialize(conf);
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true);
+    assertNotNull("root.leaf1 does not exist", q);
+    assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+
+    // assigning an application (without an appAttempt so far) to the queue
+    // removeEmptyDynamicQueues() should not remove the queue
+    ApplicationId applicationId = ApplicationId.newInstance(1L, 0);
+    q.addAssignedApp(applicationId);
+    q = queueManager.getLeafQueue("root.leaf1", false);
+    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.leaf1", false);
+    assertNotNull("root.leaf1 has been removed", q);
+    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    ActiveUsersManager activeUsersManager =
+        Mockito.mock(ActiveUsersManager.class);
+    RMContext rmContext = Mockito.mock(RMContext.class);
+
+    // the appAttempt is created
+    // removeEmptyDynamicQueues() should not remove the queue
+    FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId,
+        "a_user", q, activeUsersManager, rmContext);
+    q.addApp(appAttempt, true);
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.leaf1", false);
+    assertNotNull("root.leaf1 has been removed", q);
+    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+
+    // the appAttempt finished, the queue should be empty
+    q.removeApp(appAttempt);
+    q = queueManager.getLeafQueue("root.leaf1", false);
+    assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+
+    // removeEmptyDynamicQueues() should remove the queue
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.leaf1", false);
+    assertNull("root.leaf1 has not been removed", q);
+  }
+
+  @Test
+  public void testRemovalOfIncompatibleNonEmptyQueue()
+      throws Exception {
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+    allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a");
+    scheduler.allocConf = allocConf;
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    FSLeafQueue q = queueManager.getLeafQueue("root.a", true);
+    assertNotNull("root.a does not exist", q);
+    assertTrue("root.a is not empty", queueManager.isEmpty(q));
+
+    // we start to run an application on root.a
+    notEmptyQueues.add(q);
+    q = queueManager.getLeafQueue("root.a", false);
+    assertNotNull("root.a does not exist", q);
+    assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+    // root.a should not be removed by removeEmptyDynamicQueues or by
+    // removePendingIncompatibleQueues
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.a", false);
+    assertNotNull("root.a does not exist", q);
+
+    // let's introduce queue incompatibility
+    allocConf.configuredQueues.get(FSQueueType.LEAF).remove("root.a");
+    allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.a");
+    allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a.b");
+    queueManager.updateAllocationConfiguration(allocConf);
+
+    // since root.a has running applications, it should be still a leaf queue
+    q = queueManager.getLeafQueue("root.a", false);
+    assertNotNull("root.a has been removed", q);
+    assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+    // removePendingIncompatibleQueues should still keep root.a as a leaf queue
+    queueManager.removePendingIncompatibleQueues();
+    q = queueManager.getLeafQueue("root.a", false);
+    assertNotNull("root.a has been removed", q);
+    assertFalse("root.a is empty", queueManager.isEmpty(q));
+
+    // when the application finishes, root.a should be a parent queue
+    notEmptyQueues.clear();
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    FSParentQueue p = queueManager.getParentQueue("root.a", false);
+    assertNotNull("root.a does not exist", p);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to