http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 922d711..6129772 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -85,11 +86,11 @@ public class LeafQueue extends AbstractCSQueue {
   private static final Log LOG = LogFactory.getLog(LeafQueue.class);
 
   private float absoluteUsedCapacity = 0.0f;
-  private int userLimit;
-  private float userLimitFactor;
+  private volatile int userLimit;
+  private volatile float userLimitFactor;
 
   protected int maxApplications;
-  protected int maxApplicationsPerUser;
+  protected volatile int maxApplicationsPerUser;
   
   private float maxAMResourcePerQueuePercent;
 
@@ -97,15 +98,15 @@ public class LeafQueue extends AbstractCSQueue {
   private volatile boolean rackLocalityFullReset;
 
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
-      new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
+      new ConcurrentHashMap<>();
 
   private Priority defaultAppPriorityPerQueue;
 
-  private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
+  private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy;
 
   private volatile float minimumAllocationFactor;
 
-  private Map<String, User> users = new HashMap<String, User>();
+  private Map<String, User> users = new ConcurrentHashMap<>();
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -122,7 +123,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
 
-  private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
+  private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
 
   // Summation of consumed ratios for all users in queue
   private float totalUserConsumedRatio = 0;
@@ -131,7 +132,7 @@ public class LeafQueue extends AbstractCSQueue {
   // record all ignore partition exclusivityRMContainer, this will be used to 
do
   // preemption, key is the partition of the RMContainer allocated on
   private Map<String, TreeSet<RMContainer>> 
ignorePartitionExclusivityRMContainers =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public LeafQueue(CapacitySchedulerContext cs,
@@ -154,125 +155,125 @@ public class LeafQueue extends AbstractCSQueue {
     setupQueueConfigs(cs.getClusterResource());
   }
 
-  protected synchronized void setupQueueConfigs(Resource clusterResource)
+  protected void setupQueueConfigs(Resource clusterResource)
       throws IOException {
-    super.setupQueueConfigs(clusterResource);
-    
-    this.lastClusterResource = clusterResource;
-    
-    this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
-    
-    // Initialize headroom info, also used for calculating application 
-    // master resource limits.  Since this happens during queue initialization
-    // and all queues may not be realized yet, we'll use (optimistic) 
-    // absoluteMaxCapacity (it will be replaced with the more accurate 
-    // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-    setQueueResourceLimitsInfo(clusterResource);
+    try {
+      writeLock.lock();
+      super.setupQueueConfigs(clusterResource);
 
-    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+      this.lastClusterResource = clusterResource;
 
-    
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
+      this.cachedResourceLimitsForHeadroom = new ResourceLimits(
+          clusterResource);
 
-    userLimit = conf.getUserLimit(getQueuePath());
-    userLimitFactor = conf.getUserLimitFactor(getQueuePath());
+      // Initialize headroom info, also used for calculating application
+      // master resource limits.  Since this happens during queue 
initialization
+      // and all queues may not be realized yet, we'll use (optimistic)
+      // absoluteMaxCapacity (it will be replaced with the more accurate
+      // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
+      setQueueResourceLimitsInfo(clusterResource);
 
-    maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
-    if (maxApplications < 0) {
-      int maxSystemApps = conf.getMaximumSystemApplications();
-      maxApplications =
-          (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
-    }
-    maxApplicationsPerUser = Math.min(maxApplications,
-        (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor));
-    
-    maxAMResourcePerQueuePercent =
-        
conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
-
-    if (!SchedulerUtils.checkQueueLabelExpression(
-        this.accessibleLabels, this.defaultLabelExpression, null)) {
-      throw new IOException("Invalid default label expression of "
-          + " queue="
-          + getQueueName()
-          + " doesn't have permission to access all labels "
-          + "in default label expression. labelExpression of resource request="
-          + (this.defaultLabelExpression == null ? ""
-              : this.defaultLabelExpression)
-          + ". Queue labels="
-          + (getAccessibleNodeLabels() == null ? "" : StringUtils.join(
-              getAccessibleNodeLabels().iterator(), ',')));
-    }
-    
-    nodeLocalityDelay = conf.getNodeLocalityDelay();
-    rackLocalityFullReset = conf.getRackLocalityFullReset();
+      CapacitySchedulerConfiguration conf = csContext.getConfiguration();
 
-    // re-init this since max allocation could have changed
-    this.minimumAllocationFactor =
-        Resources.ratio(resourceCalculator,
-            Resources.subtract(maximumAllocation, minimumAllocation),
-            maximumAllocation);
+      setOrderingPolicy(
+          conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
 
-    StringBuilder aclsString = new StringBuilder();
-    for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
-      aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
-    }
+      userLimit = conf.getUserLimit(getQueuePath());
+      userLimitFactor = conf.getUserLimitFactor(getQueuePath());
 
-    StringBuilder labelStrBuilder = new StringBuilder(); 
-    if (accessibleLabels != null) {
-      for (String s : accessibleLabels) {
-        labelStrBuilder.append(s);
-        labelStrBuilder.append(",");
+      maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
+      if (maxApplications < 0) {
+        int maxSystemApps = conf.getMaximumSystemApplications();
+        maxApplications =
+            (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
       }
-    }
+      maxApplicationsPerUser = Math.min(maxApplications,
+          (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));
+
+      maxAMResourcePerQueuePercent =
+          conf.getMaximumApplicationMasterResourcePerQueuePercent(
+              getQueuePath());
+
+      if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
+          this.defaultLabelExpression, null)) {
+        throw new IOException(
+            "Invalid default label expression of " + " queue=" + getQueueName()
+                + " doesn't have permission to access all labels "
+                + "in default label expression. labelExpression of resource 
request="
+                + (this.defaultLabelExpression == null ?
+                "" :
+                this.defaultLabelExpression) + ". Queue labels=" + (
+                getAccessibleNodeLabels() == null ?
+                    "" :
+                    StringUtils
+                        .join(getAccessibleNodeLabels().iterator(), ',')));
+      }
+
+      nodeLocalityDelay = conf.getNodeLocalityDelay();
+      rackLocalityFullReset = conf.getRackLocalityFullReset();
+
+      // re-init this since max allocation could have changed
+      this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
+          Resources.subtract(maximumAllocation, minimumAllocation),
+          maximumAllocation);
 
-    defaultAppPriorityPerQueue = Priority.newInstance(conf
-        .getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
-
-    LOG.info("Initializing " + queueName + "\n" +
-        "capacity = " + queueCapacities.getCapacity() +
-        " [= (float) configuredCapacity / 100 ]" + "\n" + 
-        "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
-        " [= parentAbsoluteCapacity * capacity ]" + "\n" +
-        "maxCapacity = " + queueCapacities.getMaximumCapacity() +
-        " [= configuredMaxCapacity ]" + "\n" +
-        "absoluteMaxCapacity = " + 
queueCapacities.getAbsoluteMaximumCapacity() +
-        " [= 1.0 maximumCapacity undefined, " +
-        "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + 
-        "\n" +
-        "userLimit = " + userLimit +
-        " [= configuredUserLimit ]" + "\n" +
-        "userLimitFactor = " + userLimitFactor +
-        " [= configuredUserLimitFactor ]" + "\n" +
-        "maxApplications = " + maxApplications +
-        " [= configuredMaximumSystemApplicationsPerQueue or" + 
-        " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + 
-        "\n" +
-        "maxApplicationsPerUser = " + maxApplicationsPerUser +
-        " [= (int)(maxApplications * (userLimit / 100.0f) * " +
-        "userLimitFactor) ]" + "\n" +
-        "usedCapacity = " + queueCapacities.getUsedCapacity() +
-        " [= usedResourcesMemory / " +
-        "(clusterResourceMemory * absoluteCapacity)]" + "\n" +
-        "absoluteUsedCapacity = " + absoluteUsedCapacity +
-        " [= usedResourcesMemory / clusterResourceMemory]" + "\n" +
-        "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent +
-        " [= configuredMaximumAMResourcePercent ]" + "\n" +
-        "minimumAllocationFactor = " + minimumAllocationFactor +
-        " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " +
-        "maximumAllocationMemory ]" + "\n" +
-        "maximumAllocation = " + maximumAllocation +
-        " [= configuredMaxAllocation ]" + "\n" +
-        "numContainers = " + numContainers +
-        " [= currentNumContainers ]" + "\n" +
-        "state = " + state +
-        " [= configuredState ]" + "\n" +
-        "acls = " + aclsString +
-        " [= configuredAcls ]" + "\n" + 
-        "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
-        "labels=" + labelStrBuilder.toString() + "\n" +
-        "reservationsContinueLooking = " +
-        reservationsContinueLooking + "\n" +
-        "preemptionDisabled = " + getPreemptionDisabled() + "\n" +
-        "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
+      StringBuilder aclsString = new StringBuilder();
+      for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
+        aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
+      }
+
+      StringBuilder labelStrBuilder = new StringBuilder();
+      if (accessibleLabels != null) {
+        for (String s : accessibleLabels) {
+          labelStrBuilder.append(s);
+          labelStrBuilder.append(",");
+        }
+      }
+
+      defaultAppPriorityPerQueue = Priority.newInstance(
+          conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
+
+      LOG.info(
+          "Initializing " + queueName + "\n" + "capacity = " + queueCapacities
+              .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n"
+              + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity()
+              + " [= parentAbsoluteCapacity * capacity ]" + "\n"
+              + "maxCapacity = " + queueCapacities.getMaximumCapacity()
+              + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = "
+              + queueCapacities.getAbsoluteMaximumCapacity()
+              + " [= 1.0 maximumCapacity undefined, "
+              + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise 
]"
+              + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]"
+              + "\n" + "userLimitFactor = " + userLimitFactor
+              + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+              + maxApplications
+              + " [= configuredMaximumSystemApplicationsPerQueue or"
+              + " (int)(configuredMaximumSystemApplications * 
absoluteCapacity)]"
+              + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
+              + " [= (int)(maxApplications * (userLimit / 100.0f) * "
+              + "userLimitFactor) ]" + "\n" + "usedCapacity = "
+              + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / 
"
+              + "(clusterResourceMemory * absoluteCapacity)]" + "\n"
+              + "absoluteUsedCapacity = " + absoluteUsedCapacity
+              + " [= usedResourcesMemory / clusterResourceMemory]" + "\n"
+              + "maxAMResourcePerQueuePercent = " + 
maxAMResourcePerQueuePercent
+              + " [= configuredMaximumAMResourcePercent ]" + "\n"
+              + "minimumAllocationFactor = " + minimumAllocationFactor
+              + " [= (float)(maximumAllocationMemory - 
minimumAllocationMemory) / "
+              + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
+              + maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
+              + "numContainers = " + numContainers
+              + " [= currentNumContainers ]" + "\n" + "state = " + state
+              + " [= configuredState ]" + "\n" + "acls = " + aclsString
+              + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = "
+              + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder
+              .toString() + "\n" + "reservationsContinueLooking = "
+              + reservationsContinueLooking + "\n" + "preemptionDisabled = "
+              + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = 
"
+              + defaultAppPriorityPerQueue);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
@@ -300,7 +301,7 @@ public class LeafQueue extends AbstractCSQueue {
     return maxApplications;
   }
 
-  public synchronized int getMaxApplicationsPerUser() {
+  public int getMaxApplicationsPerUser() {
     return maxApplicationsPerUser;
   }
 
@@ -318,7 +319,8 @@ public class LeafQueue extends AbstractCSQueue {
    * Set user limit - used only for testing.
    * @param userLimit new user limit
    */
-  synchronized void setUserLimit(int userLimit) {
+  @VisibleForTesting
+  void setUserLimit(int userLimit) {
     this.userLimit = userLimit;
   }
 
@@ -326,50 +328,74 @@ public class LeafQueue extends AbstractCSQueue {
    * Set user limit factor - used only for testing.
    * @param userLimitFactor new user limit factor
    */
-  synchronized void setUserLimitFactor(float userLimitFactor) {
+  @VisibleForTesting
+  void setUserLimitFactor(float userLimitFactor) {
     this.userLimitFactor = userLimitFactor;
   }
 
   @Override
-  public synchronized int getNumApplications() {
-    return getNumPendingApplications() + getNumActiveApplications();
-  }
-
-  public synchronized int getNumPendingApplications() {
-    return pendingOrderingPolicy.getNumSchedulableEntities();
+  public int getNumApplications() {
+    try {
+      readLock.lock();
+      return getNumPendingApplications() + getNumActiveApplications();
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  public synchronized int getNumActiveApplications() {
-    return orderingPolicy.getNumSchedulableEntities();
+  public int getNumPendingApplications() {
+    try {
+      readLock.lock();
+      return pendingOrderingPolicy.getNumSchedulableEntities();
+    } finally {
+      readLock.unlock();
+    }
   }
 
-  @Private
-  public synchronized int getNumApplications(String user) {
-    return getUser(user).getTotalApplications();
+  public int getNumActiveApplications() {
+    try {
+      readLock.lock();
+      return orderingPolicy.getNumSchedulableEntities();
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Private
-  public synchronized int getNumPendingApplications(String user) {
-    return getUser(user).getPendingApplications();
+  public int getNumPendingApplications(String user) {
+    try {
+      readLock.lock();
+      User u = getUser(user);
+      if (null == u) {
+        return 0;
+      }
+      return u.getPendingApplications();
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Private
-  public synchronized int getNumActiveApplications(String user) {
-    return getUser(user).getActiveApplications();
-  }
-
-  @Override
-  public synchronized QueueState getState() {
-    return state;
+  public int getNumActiveApplications(String user) {
+    try {
+      readLock.lock();
+      User u = getUser(user);
+      if (null == u) {
+        return 0;
+      }
+      return u.getActiveApplications();
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Private
-  public synchronized int getUserLimit() {
+  public int getUserLimit() {
     return userLimit;
   }
 
   @Private
-  public synchronized float getUserLimitFactor() {
+  public float getUserLimitFactor() {
     return userLimitFactor;
   }
 
@@ -381,112 +407,145 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized List<QueueUserACLInfo> 
+  public List<QueueUserACLInfo>
   getQueueUserAclInfo(UserGroupInformation user) {
-    QueueUserACLInfo userAclInfo = 
-      recordFactory.newRecordInstance(QueueUserACLInfo.class);
-    List<QueueACL> operations = new ArrayList<QueueACL>();
-    for (QueueACL operation : QueueACL.values()) {
-      if (hasAccess(operation, user)) {
-        operations.add(operation);
+    try {
+      readLock.lock();
+      QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
+          QueueUserACLInfo.class);
+      List<QueueACL> operations = new ArrayList<>();
+      for (QueueACL operation : QueueACL.values()) {
+        if (hasAccess(operation, user)) {
+          operations.add(operation);
+        }
       }
+
+      userAclInfo.setQueueName(getQueueName());
+      userAclInfo.setUserAcls(operations);
+      return Collections.singletonList(userAclInfo);
+    } finally {
+      readLock.unlock();
     }
 
-    userAclInfo.setQueueName(getQueueName());
-    userAclInfo.setUserAcls(operations);
-    return Collections.singletonList(userAclInfo);
   }
 
   public String toString() {
-    return queueName + ": " + 
-        "capacity=" + queueCapacities.getCapacity() + ", " + 
-        "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + 
-        "usedResources=" + queueUsage.getUsed() +  ", " +
-        "usedCapacity=" + getUsedCapacity() + ", " + 
-        "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
-        "numApps=" + getNumApplications() + ", " + 
-        "numContainers=" + getNumContainers();  
+    try {
+      readLock.lock();
+      return queueName + ": " + "capacity=" + queueCapacities.getCapacity()
+          + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
+          + ", " + "usedResources=" + queueUsage.getUsed() + ", "
+          + "usedCapacity=" + getUsedCapacity() + ", " + 
"absoluteUsedCapacity="
+          + getAbsoluteUsedCapacity() + ", " + "numApps=" + 
getNumApplications()
+          + ", " + "numContainers=" + getNumContainers();
+    } finally {
+      readLock.unlock();
+    }
+
   }
-  
+
   @VisibleForTesting
-  public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) {
-    this.labelManager = mgr;
+  public User getUser(String userName) {
+    return users.get(userName);
   }
 
-  @VisibleForTesting
-  public synchronized User getUser(String userName) {
-    User user = users.get(userName);
-    if (user == null) {
-      user = new User();
-      users.put(userName, user);
+  // Get and add user if absent
+  private User getUserAndAddIfAbsent(String userName) {
+    try {
+      writeLock.lock();
+      User u = users.get(userName);
+      if (null == u) {
+        u = new User();
+        users.put(userName, u);
+      }
+      return u;
+    } finally {
+      writeLock.unlock();
     }
-    return user;
   }
 
   /**
    * @return an ArrayList of UserInfo objects who are active in this queue
    */
-  public synchronized ArrayList<UserInfo> getUsers() {
-    ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
-    for (Map.Entry<String, User> entry : users.entrySet()) {
-      User user = entry.getValue();
-      usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user
-          .getAllUsed()), user.getActiveApplications(), user
-          .getPendingApplications(), Resources.clone(user
-          .getConsumedAMResources()), Resources.clone(user
-          .getUserResourceLimit()), user.getResourceUsage()));
+  public ArrayList<UserInfo> getUsers() {
+    try {
+      readLock.lock();
+      ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
+      for (Map.Entry<String, User> entry : users.entrySet()) {
+        User user = entry.getValue();
+        usersToReturn.add(
+            new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
+                user.getActiveApplications(), user.getPendingApplications(),
+                Resources.clone(user.getConsumedAMResources()),
+                Resources.clone(user.getUserResourceLimit()),
+                user.getResourceUsage()));
+      }
+      return usersToReturn;
+    } finally {
+      readLock.unlock();
     }
-    return usersToReturn;
   }
 
   @Override
-  public synchronized void reinitialize(
+  public void reinitialize(
       CSQueue newlyParsedQueue, Resource clusterResource) 
   throws IOException {
-    // Sanity check
-    if (!(newlyParsedQueue instanceof LeafQueue) || 
-        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
-      throw new IOException("Trying to reinitialize " + getQueuePath() + 
-          " from " + newlyParsedQueue.getQueuePath());
-    }
-
-    LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
 
-    // don't allow the maximum allocation to be decreased in size
-    // since we have already told running AM's the size
-    Resource oldMax = getMaximumAllocation();
-    Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
-    if (newMax.getMemorySize() < oldMax.getMemorySize()
-        || newMax.getVirtualCores() < oldMax.getVirtualCores()) {
-      throw new IOException(
-          "Trying to reinitialize "
-              + getQueuePath()
-              + " the maximum allocation size can not be decreased!"
-              + " Current setting: " + oldMax
-              + ", trying to set it to: " + newMax);
-    }
+      LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
+
+      // don't allow the maximum allocation to be decreased in size
+      // since we have already told running AM's the size
+      Resource oldMax = getMaximumAllocation();
+      Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
+      if (newMax.getMemorySize() < oldMax.getMemorySize()
+          || newMax.getVirtualCores() < oldMax.getVirtualCores()) {
+        throw new IOException("Trying to reinitialize " + getQueuePath()
+            + " the maximum allocation size can not be decreased!"
+            + " Current setting: " + oldMax + ", trying to set it to: "
+            + newMax);
+      }
 
-    setupQueueConfigs(clusterResource);
+      setupQueueConfigs(clusterResource);
 
-    // queue metrics are updated, more resource may be available
-    // activate the pending applications if possible
-    activateApplications();
+      // queue metrics are updated, more resource may be available
+      // activate the pending applications if possible
+      activateApplications();
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
   public void submitApplicationAttempt(FiCaSchedulerApp application,
       String userName) {
     // Careful! Locking order is important!
-    synchronized (this) {
-      User user = getUser(userName);
+    try {
+      writeLock.lock();
+
+      // TODO, should use getUser, use this method just to avoid UT failure
+      // which is caused by wrong invoking order, will fix UT separately
+      User user = getUserAndAddIfAbsent(userName);
+
       // Add the attempt to our data-structures
       addApplicationAttempt(application, user);
+    } finally {
+      writeLock.unlock();
     }
 
     // We don't want to update metrics for move app
     if (application.isPending()) {
       metrics.submitAppAttempt(userName);
     }
+
     getParent().submitApplicationAttempt(application, userName);
   }
 
@@ -494,37 +553,38 @@ public class LeafQueue extends AbstractCSQueue {
   public void submitApplication(ApplicationId applicationId, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
-
-    User user = null;
-    synchronized (this) {
-
+    try {
+      writeLock.lock();
       // Check if the queue is accepting jobs
       if (getState() != QueueState.RUNNING) {
-        String msg = "Queue " + getQueuePath() +
-        " is STOPPED. Cannot accept submission of application: " + 
applicationId;
+        String msg = "Queue " + getQueuePath()
+            + " is STOPPED. Cannot accept submission of application: "
+            + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
 
       // Check submission limits for queues
       if (getNumApplications() >= getMaxApplications()) {
-        String msg = "Queue " + getQueuePath() + 
-        " already has " + getNumApplications() + " applications," +
-        " cannot accept submission of application: " + applicationId;
+        String msg =
+            "Queue " + getQueuePath() + " already has " + getNumApplications()
+                + " applications,"
+                + " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
 
       // Check submission limits for the user on this queue
-      user = getUser(userName);
+      User user = getUserAndAddIfAbsent(userName);
       if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
-        String msg = "Queue " + getQueuePath() + 
-        " already has " + user.getTotalApplications() + 
-        " applications from user " + userName + 
-        " cannot accept submission of application: " + applicationId;
+        String msg = "Queue " + getQueuePath() + " already has " + user
+            .getTotalApplications() + " applications from user " + userName
+            + " cannot accept submission of application: " + applicationId;
         LOG.info(msg);
         throw new AccessControlException(msg);
       }
+    } finally {
+      writeLock.unlock();
     }
 
     // Inform the parent queue
@@ -546,214 +606,237 @@ public class LeafQueue extends AbstractCSQueue {
     return queueUsage.getAMLimit(nodePartition);
   }
 
-  public synchronized Resource calculateAndGetAMResourceLimit() {
+  @VisibleForTesting
+  public Resource calculateAndGetAMResourceLimit() {
     return calculateAndGetAMResourceLimitPerPartition(
         RMNodeLabelsManager.NO_LABEL);
   }
 
   @VisibleForTesting
-  public synchronized Resource getUserAMResourceLimit() {
+  public Resource getUserAMResourceLimit() {
      return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
   }
 
-  public synchronized Resource getUserAMResourceLimitPerPartition(
+  public Resource getUserAMResourceLimitPerPartition(
       String nodePartition) {
-    /*
-     * The user am resource limit is based on the same approach as the user
-     * limit (as it should represent a subset of that). This means that it uses
-     * the absolute queue capacity (per partition) instead of the max and is
-     * modified by the userlimit and the userlimit factor as is the userlimit
-     */
-    float effectiveUserLimit = Math.max(userLimit / 100.0f,
-        1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
-
-    Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
-        resourceCalculator,
-        labelManager.getResourceByLabel(nodePartition, lastClusterResource),
-        queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
-
-    Resource userAMLimit = Resources.multiplyAndNormalizeUp(resourceCalculator,
-        queuePartitionResource,
-        queueCapacities.getMaxAMResourcePercentage(nodePartition)
-            * effectiveUserLimit * userLimitFactor, minimumAllocation);
-    return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
-        userAMLimit, getAMResourceLimitPerPartition(nodePartition))
-        ? userAMLimit
-        : getAMResourceLimitPerPartition(nodePartition);
-  }
-
-  public synchronized Resource calculateAndGetAMResourceLimitPerPartition(
+    try {
+      readLock.lock();
+      /*
+       * The user am resource limit is based on the same approach as the user
+       * limit (as it should represent a subset of that). This means that it 
uses
+       * the absolute queue capacity (per partition) instead of the max and is
+       * modified by the userlimit and the userlimit factor as is the userlimit
+       */
+      float effectiveUserLimit = Math.max(userLimit / 100.0f,
+          1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+
+      Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
+          resourceCalculator,
+          labelManager.getResourceByLabel(nodePartition, lastClusterResource),
+          queueCapacities.getAbsoluteCapacity(nodePartition),
+          minimumAllocation);
+
+      Resource userAMLimit = Resources.multiplyAndNormalizeUp(
+          resourceCalculator, queuePartitionResource,
+          queueCapacities.getMaxAMResourcePercentage(nodePartition)
+              * effectiveUserLimit * userLimitFactor, minimumAllocation);
+      return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+          userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
+          userAMLimit :
+          getAMResourceLimitPerPartition(nodePartition);
+    } finally {
+      readLock.unlock();
+    }
+
+  }
+
+  public Resource calculateAndGetAMResourceLimitPerPartition(
       String nodePartition) {
-    /*
-     * For non-labeled partition, get the max value from resources currently
-     * available to the queue and the absolute resources guaranteed for the
-     * partition in the queue. For labeled partition, consider only the 
absolute
-     * resources guaranteed. Multiply this value (based on labeled/
-     * non-labeled), * with per-partition am-resource-percent to get the max am
-     * resource limit for this queue and partition.
-     */
-    Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
-        resourceCalculator,
-        labelManager.getResourceByLabel(nodePartition, lastClusterResource),
-        queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation);
-
-    Resource queueCurrentLimit = Resources.none();
-    // For non-labeled partition, we need to consider the current queue
-    // usage limit.
-    if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      synchronized (queueResourceLimitsInfo) {
-        queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
+    try {
+      writeLock.lock();
+      /*
+       * For non-labeled partition, get the max value from resources currently
+       * available to the queue and the absolute resources guaranteed for the
+       * partition in the queue. For labeled partition, consider only the 
absolute
+       * resources guaranteed. Multiply this value (based on labeled/
+       * non-labeled), * with per-partition am-resource-percent to get the max 
am
+       * resource limit for this queue and partition.
+       */
+      Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
+          resourceCalculator,
+          labelManager.getResourceByLabel(nodePartition, lastClusterResource),
+          queueCapacities.getAbsoluteCapacity(nodePartition),
+          minimumAllocation);
+
+      Resource queueCurrentLimit = Resources.none();
+      // For non-labeled partition, we need to consider the current queue
+      // usage limit.
+      if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+        synchronized (queueResourceLimitsInfo){
+          queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
+        }
       }
-    }
 
-    float amResourcePercent = queueCapacities
-        .getMaxAMResourcePercentage(nodePartition);
+      float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
+          nodePartition);
 
-    // Current usable resource for this queue and partition is the max of
-    // queueCurrentLimit and queuePartitionResource.
-    Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
-        lastClusterResource, queueCurrentLimit, queuePartitionResource);
+      // Current usable resource for this queue and partition is the max of
+      // queueCurrentLimit and queuePartitionResource.
+      Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
+          lastClusterResource, queueCurrentLimit, queuePartitionResource);
 
-    Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
-        resourceCalculator, queuePartitionUsableResource, amResourcePercent,
-        minimumAllocation);
+      Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
+          resourceCalculator, queuePartitionUsableResource, amResourcePercent,
+          minimumAllocation);
 
-    metrics.setAMResouceLimit(amResouceLimit);
-    queueUsage.setAMLimit(nodePartition, amResouceLimit);
-    return amResouceLimit;
+      metrics.setAMResouceLimit(amResouceLimit);
+      queueUsage.setAMLimit(nodePartition, amResouceLimit);
+      return amResouceLimit;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized void activateApplications() {
-    // limit of allowed resource usage for application masters
-    Map<String, Resource> userAmPartitionLimit =
-        new HashMap<String, Resource>();
-
-    // AM Resource Limit for accessible labels can be pre-calculated.
-    // This will help in updating AMResourceLimit for all labels when queue
-    // is initialized for the first time (when no applications are present).
-    for (String nodePartition : getNodeLabelsForQueue()) {
-      calculateAndGetAMResourceLimitPerPartition(nodePartition);
-    }
+  private void activateApplications() {
+    try {
+      writeLock.lock();
+      // limit of allowed resource usage for application masters
+      Map<String, Resource> userAmPartitionLimit =
+          new HashMap<String, Resource>();
+
+      // AM Resource Limit for accessible labels can be pre-calculated.
+      // This will help in updating AMResourceLimit for all labels when queue
+      // is initialized for the first time (when no applications are present).
+      for (String nodePartition : getNodeLabelsForQueue()) {
+        calculateAndGetAMResourceLimitPerPartition(nodePartition);
+      }
 
-    for (Iterator<FiCaSchedulerApp> fsApp =
-        getPendingAppsOrderingPolicy().getAssignmentIterator();
-        fsApp.hasNext();) {
-      FiCaSchedulerApp application = fsApp.next();
-      ApplicationId applicationId = application.getApplicationId();
+      for (Iterator<FiCaSchedulerApp> fsApp =
+           getPendingAppsOrderingPolicy().getAssignmentIterator();
+           fsApp.hasNext(); ) {
+        FiCaSchedulerApp application = fsApp.next();
+        ApplicationId applicationId = application.getApplicationId();
 
-      // Get the am-node-partition associated with each application
-      // and calculate max-am resource limit for this partition.
-      String partitionName = application.getAppAMNodePartitionName();
+        // Get the am-node-partition associated with each application
+        // and calculate max-am resource limit for this partition.
+        String partitionName = application.getAppAMNodePartitionName();
 
-      Resource amLimit = getAMResourceLimitPerPartition(partitionName);
-      // Verify whether we already calculated am-limit for this label.
-      if (amLimit == null) {
-        amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
-      }
-      // Check am resource limit.
-      Resource amIfStarted = Resources.add(
-          application.getAMResource(partitionName),
-          queueUsage.getAMUsed(partitionName));
+        Resource amLimit = getAMResourceLimitPerPartition(partitionName);
+        // Verify whether we already calculated am-limit for this label.
+        if (amLimit == null) {
+          amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
+        }
+        // Check am resource limit.
+        Resource amIfStarted = Resources.add(
+            application.getAMResource(partitionName),
+            queueUsage.getAMUsed(partitionName));
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("application "+application.getId() +" AMResource "
-            + application.getAMResource(partitionName)
-            + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
-            + " amLimit " + amLimit + " lastClusterResource "
-            + lastClusterResource + " amIfStarted " + amIfStarted
-            + " AM node-partition name " + partitionName);
-      }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("application " + application.getId() + " AMResource "
+              + application.getAMResource(partitionName)
+              + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
+              + " amLimit " + amLimit + " lastClusterResource "
+              + lastClusterResource + " amIfStarted " + amIfStarted
+              + " AM node-partition name " + partitionName);
+        }
 
-      if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
-          amIfStarted, amLimit)) {
-        if (getNumActiveApplications() < 1
-            || (Resources.lessThanOrEqual(resourceCalculator,
-                lastClusterResource, queueUsage.getAMUsed(partitionName),
-                Resources.none()))) {
-          LOG.warn("maximum-am-resource-percent is insufficient to start a"
-              + " single application in queue, it is likely set too low."
-              + " skipping enforcement to allow at least one application"
-              + " to start");
-        } else {
-          application.updateAMContainerDiagnostics(AMState.INACTIVATED,
-              
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
-          LOG.info("Not activating application " + applicationId
-              + " as  amIfStarted: " + amIfStarted + " exceeds amLimit: "
-              + amLimit);
-          continue;
+        if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+            amIfStarted, amLimit)) {
+          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
+              resourceCalculator, lastClusterResource,
+              queueUsage.getAMUsed(partitionName), Resources.none()))) {
+            LOG.warn("maximum-am-resource-percent is insufficient to start a"
+                + " single application in queue, it is likely set too low."
+                + " skipping enforcement to allow at least one application"
+                + " to start");
+          } else{
+            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+                
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
+            LOG.info("Not activating application " + applicationId
+                + " as  amIfStarted: " + amIfStarted + " exceeds amLimit: "
+                + amLimit);
+            continue;
+          }
         }
-      }
 
-      // Check user am resource limit
-      User user = getUser(application.getUser());
-      Resource userAMLimit = userAmPartitionLimit.get(partitionName);
+        // Check user am resource limit
+        User user = getUser(application.getUser());
+        Resource userAMLimit = userAmPartitionLimit.get(partitionName);
 
-      // Verify whether we already calculated user-am-limit for this label.
-      if (userAMLimit == null) {
-        userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
-        userAmPartitionLimit.put(partitionName, userAMLimit);
-      }
+        // Verify whether we already calculated user-am-limit for this label.
+        if (userAMLimit == null) {
+          userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
+          userAmPartitionLimit.put(partitionName, userAMLimit);
+        }
 
-      Resource userAmIfStarted = Resources.add(
-          application.getAMResource(partitionName),
-          user.getConsumedAMResources(partitionName));
-
-      if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
-          userAmIfStarted, userAMLimit)) {
-        if (getNumActiveApplications() < 1
-            || (Resources.lessThanOrEqual(resourceCalculator,
-                lastClusterResource, queueUsage.getAMUsed(partitionName),
-                Resources.none()))) {
-          LOG.warn("maximum-am-resource-percent is insufficient to start a"
-              + " single application in queue for user, it is likely set too"
-              + " low. skipping enforcement to allow at least one application"
-              + " to start");
-        } else {
-          application.updateAMContainerDiagnostics(AMState.INACTIVATED,
-              
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
-          LOG.info("Not activating application " + applicationId
-              + " for user: " + user + " as userAmIfStarted: "
-              + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit);
-          continue;
+        Resource userAmIfStarted = Resources.add(
+            application.getAMResource(partitionName),
+            user.getConsumedAMResources(partitionName));
+
+        if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+            userAmIfStarted, userAMLimit)) {
+          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
+              resourceCalculator, lastClusterResource,
+              queueUsage.getAMUsed(partitionName), Resources.none()))) {
+            LOG.warn("maximum-am-resource-percent is insufficient to start a"
+                + " single application in queue for user, it is likely set too"
+                + " low. skipping enforcement to allow at least one 
application"
+                + " to start");
+          } else{
+            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+                
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
+            LOG.info(
+                "Not activating application " + applicationId + " for user: "
+                    + user + " as userAmIfStarted: " + userAmIfStarted
+                    + " exceeds userAmLimit: " + userAMLimit);
+            continue;
+          }
         }
+        user.activateApplication();
+        orderingPolicy.addSchedulableEntity(application);
+        application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
+
+        queueUsage.incAMUsed(partitionName,
+            application.getAMResource(partitionName));
+        user.getResourceUsage().incAMUsed(partitionName,
+            application.getAMResource(partitionName));
+        user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
+        metrics.incAMUsed(application.getUser(),
+            application.getAMResource(partitionName));
+        metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
+        fsApp.remove();
+        LOG.info("Application " + applicationId + " from user: " + application
+            .getUser() + " activated in queue: " + getQueueName());
       }
-      user.activateApplication();
-      orderingPolicy.addSchedulableEntity(application);
-      application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
-
-      queueUsage.incAMUsed(partitionName,
-          application.getAMResource(partitionName));
-      user.getResourceUsage().incAMUsed(partitionName,
-          application.getAMResource(partitionName));
-      user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
-      metrics.incAMUsed(application.getUser(),
-          application.getAMResource(partitionName));
-      metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
-      fsApp.remove();
-      LOG.info("Application " + applicationId + " from user: "
-          + application.getUser() + " activated in queue: " + getQueueName());
+    } finally {
+      writeLock.unlock();
     }
   }
   
-  private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
+  private void addApplicationAttempt(FiCaSchedulerApp application,
       User user) {
-    // Accept 
-    user.submitApplication();
-    getPendingAppsOrderingPolicy().addSchedulableEntity(application);
-    applicationAttemptMap.put(application.getApplicationAttemptId(), 
application);
+    try {
+      writeLock.lock();
+      // Accept
+      user.submitApplication();
+      getPendingAppsOrderingPolicy().addSchedulableEntity(application);
+      applicationAttemptMap.put(application.getApplicationAttemptId(),
+          application);
 
-    // Activate applications
-    activateApplications();
-    
-    LOG.info("Application added -" +
-        " appId: " + application.getApplicationId() +
-        " user: " + application.getUser() + "," +
-        " leaf-queue: " + getQueueName() +
-        " #user-pending-applications: " + user.getPendingApplications() +
-        " #user-active-applications: " + user.getActiveApplications() +
-        " #queue-pending-applications: " + getNumPendingApplications() +
-        " #queue-active-applications: " + getNumActiveApplications()
-        );
+      // Activate applications
+      activateApplications();
+
+      LOG.info(
+          "Application added -" + " appId: " + application.getApplicationId()
+              + " user: " + application.getUser() + "," + " leaf-queue: "
+              + getQueueName() + " #user-pending-applications: " + user
+              .getPendingApplications() + " #user-active-applications: " + user
+              .getActiveApplications() + " #queue-pending-applications: "
+              + getNumPendingApplications() + " #queue-active-applications: "
+              + getNumActiveApplications());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   @Override
@@ -767,49 +850,54 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void finishApplicationAttempt(FiCaSchedulerApp application, String 
queue) {
     // Careful! Locking order is important!
-    synchronized (this) {
-      removeApplicationAttempt(application, getUser(application.getUser()));
-    }
+    removeApplicationAttempt(application, application.getUser());
     getParent().finishApplicationAttempt(application, queue);
   }
 
-  public synchronized void removeApplicationAttempt(
-      FiCaSchedulerApp application, User user) {
-    String partitionName = application.getAppAMNodePartitionName();
-    boolean wasActive =
-      orderingPolicy.removeSchedulableEntity(application);
-    if (!wasActive) {
-      pendingOrderingPolicy.removeSchedulableEntity(application);
-    } else {
-      queueUsage.decAMUsed(partitionName,
-          application.getAMResource(partitionName));
-      user.getResourceUsage().decAMUsed(partitionName,
-          application.getAMResource(partitionName));
-      metrics.decAMUsed(application.getUser(),
-          application.getAMResource(partitionName));
-    }
-    applicationAttemptMap.remove(application.getApplicationAttemptId());
+  private void removeApplicationAttempt(
+      FiCaSchedulerApp application, String userName) {
+    try {
+      writeLock.lock();
 
-    user.finishApplication(wasActive);
-    if (user.getTotalApplications() == 0) {
-      users.remove(application.getUser());
-    }
+      // TODO, should use getUser, use this method just to avoid UT failure
+      // which is caused by wrong invoking order, will fix UT separately
+      User user = getUserAndAddIfAbsent(userName);
+
+      String partitionName = application.getAppAMNodePartitionName();
+      boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
+      if (!wasActive) {
+        pendingOrderingPolicy.removeSchedulableEntity(application);
+      } else{
+        queueUsage.decAMUsed(partitionName,
+            application.getAMResource(partitionName));
+        user.getResourceUsage().decAMUsed(partitionName,
+            application.getAMResource(partitionName));
+        metrics.decAMUsed(application.getUser(),
+            application.getAMResource(partitionName));
+      }
+      applicationAttemptMap.remove(application.getApplicationAttemptId());
+
+      user.finishApplication(wasActive);
+      if (user.getTotalApplications() == 0) {
+        users.remove(application.getUser());
+      }
 
-    // Check if we can activate more applications
-    activateApplications();
+      // Check if we can activate more applications
+      activateApplications();
 
-    LOG.info("Application removed -" +
-        " appId: " + application.getApplicationId() +
-        " user: " + application.getUser() +
-        " queue: " + getQueueName() +
-        " #user-pending-applications: " + user.getPendingApplications() +
-        " #user-active-applications: " + user.getActiveApplications() +
-        " #queue-pending-applications: " + getNumPendingApplications() +
-        " #queue-active-applications: " + getNumActiveApplications()
-    );
+      LOG.info(
+          "Application removed -" + " appId: " + application.getApplicationId()
+              + " user: " + application.getUser() + " queue: " + getQueueName()
+              + " #user-pending-applications: " + user.getPendingApplications()
+              + " #user-active-applications: " + user.getActiveApplications()
+              + " #queue-pending-applications: " + getNumPendingApplications()
+              + " #queue-active-applications: " + getNumActiveApplications());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  private synchronized FiCaSchedulerApp getApplication(
+  private FiCaSchedulerApp getApplication(
       ApplicationAttemptId applicationAttemptId) {
     return applicationAttemptMap.get(applicationAttemptId);
   }
@@ -871,170 +959,171 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized CSAssignment assignContainers(Resource clusterResource,
+  public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode) {
-    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
+    try {
+      writeLock.lock();
+      updateCurrentResourceLimits(currentResourceLimits, clusterResource);
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getNodeName()
-          + " #applications=" + orderingPolicy.getNumSchedulableEntities());
-    }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "assignContainers: node=" + node.getNodeName() + " #applications="
+                + orderingPolicy.getNumSchedulableEntities());
+      }
 
-    setPreemptionAllowed(currentResourceLimits, node.getPartition());
+      setPreemptionAllowed(currentResourceLimits, node.getPartition());
 
-    // Check for reserved resources
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      FiCaSchedulerApp application =
-          getApplication(reservedContainer.getApplicationAttemptId());
+      // Check for reserved resources
+      RMContainer reservedContainer = node.getReservedContainer();
+      if (reservedContainer != null) {
+        FiCaSchedulerApp application = getApplication(
+            reservedContainer.getApplicationAttemptId());
 
-      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+            node.getNodeID(), SystemClock.getInstance().getTime(), 
application);
 
-      synchronized (application) {
-        CSAssignment assignment =
-            application.assignContainers(clusterResource, node,
-                currentResourceLimits, schedulingMode, reservedContainer);
+        CSAssignment assignment = application.assignContainers(clusterResource,
+            node, currentResourceLimits, schedulingMode, reservedContainer);
         handleExcessReservedContainer(clusterResource, assignment, node,
             application);
         killToPreemptContainers(clusterResource, node, assignment);
         return assignment;
       }
-    }
 
-    // if our queue cannot access this node, just return
-    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(node.getPartition())) {
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
-              .getPartition());
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-
-    // Check if this queue need more resource, simply skip allocation if this
-    // queue doesn't need more resources.
-    if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
-        schedulingMode)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + 
node.getPartition());
+      // if our queue cannot access this node, just return
+      if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+          && !accessibleToPartition(node.getPartition())) {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
+            ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+                .getPartition());
+        return CSAssignment.NULL_ASSIGNMENT;
       }
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-
-    for (Iterator<FiCaSchedulerApp> assignmentIterator =
-        orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) 
{
-      FiCaSchedulerApp application = assignmentIterator.next();
-
-      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-          node.getNodeID(), SystemClock.getInstance().getTime(), application);
 
-      // Check queue max-capacity limit
-      if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-          currentResourceLimits, application.getCurrentReservation(),
+      // Check if this queue need more resource, simply skip allocation if this
+      // queue doesn't need more resources.
+      if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
           schedulingMode)) {
-        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-            activitiesManager, node,
-            application, application.getPriority(),
-            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip this queue=" + getQueuePath()
+              + ", because it doesn't need more resource, schedulingMode="
+              + schedulingMode.name() + " node-partition=" + node
+              .getPartition());
+        }
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
             getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.EMPTY);
+            ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
         return CSAssignment.NULL_ASSIGNMENT;
       }
 
-      Resource userLimit =
-          computeUserLimitAndSetHeadroom(application, clusterResource,
-              node.getPartition(), schedulingMode);
-
-      // Check user limit
-      if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-          application, node.getPartition(), currentResourceLimits)) {
-        application.updateAMContainerDiagnostics(AMState.ACTIVATED,
-            "User capacity has reached its maximum limit.");
-        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-            activitiesManager, node,
-            application, application.getPriority(),
-            ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
-        continue;
-      }
+      for (Iterator<FiCaSchedulerApp> assignmentIterator =
+           orderingPolicy.getAssignmentIterator();
+           assignmentIterator.hasNext(); ) {
+        FiCaSchedulerApp application = assignmentIterator.next();
+
+        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+            node.getNodeID(), SystemClock.getInstance().getTime(), 
application);
+
+        // Check queue max-capacity limit
+        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+            currentResourceLimits, application.getCurrentReservation(),
+            schedulingMode)) {
+          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+              activitiesManager, node, application, application.getPriority(),
+              ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParent().getQueueName(), getQueueName(), 
ActivityState.SKIPPED,
+              ActivityDiagnosticConstant.EMPTY);
+          return CSAssignment.NULL_ASSIGNMENT;
+        }
 
-      // Try to schedule
-      CSAssignment assignment =
-          application.assignContainers(clusterResource, node,
-              currentResourceLimits, schedulingMode, null);
+        Resource userLimit = computeUserLimitAndSetHeadroom(application,
+            clusterResource, node.getPartition(), schedulingMode);
+
+        // Check user limit
+        if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
+            application, node.getPartition(), currentResourceLimits)) {
+          application.updateAMContainerDiagnostics(AMState.ACTIVATED,
+              "User capacity has reached its maximum limit.");
+          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+              activitiesManager, node, application, application.getPriority(),
+              ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
+          continue;
+        }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("post-assignContainers for application "
-            + application.getApplicationId());
-        application.showRequests();
-      }
+        // Try to schedule
+        CSAssignment assignment = application.assignContainers(clusterResource,
+            node, currentResourceLimits, schedulingMode, null);
 
-      // Did we schedule or reserve a container?
-      Resource assigned = assignment.getResource();
-      
-      handleExcessReservedContainer(clusterResource, assignment, node,
-          application);
-      killToPreemptContainers(clusterResource, node, assignment);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("post-assignContainers for application " + application
+              .getApplicationId());
+          application.showRequests();
+        }
 
-      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
-          Resources.none())) {
-        // Get reserved or allocated container from application
-        RMContainer reservedOrAllocatedRMContainer =
-            application.getRMContainer(assignment.getAssignmentInformation()
-                .getFirstAllocatedOrReservedContainerId());
+        // Did we schedule or reserve a container?
+        Resource assigned = assignment.getResource();
 
-        // Book-keeping
-        // Note: Update headroom to account for current allocation too...
-        allocateResource(clusterResource, application, assigned,
-            node.getPartition(), reservedOrAllocatedRMContainer,
-            assignment.isIncreasedAllocation());
-
-        // Update reserved metrics
-        Resource reservedRes = assignment.getAssignmentInformation()
-            .getReserved();
-        if (reservedRes != null && !reservedRes.equals(Resources.none())) {
-          incReservedResource(node.getPartition(), reservedRes);
-        }
+        handleExcessReservedContainer(clusterResource, assignment, node,
+            application);
+        killToPreemptContainers(clusterResource, node, assignment);
 
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED,
-            ActivityDiagnosticConstant.EMPTY);
+        if (Resources.greaterThan(resourceCalculator, clusterResource, 
assigned,
+            Resources.none())) {
+          // Get reserved or allocated container from application
+          RMContainer reservedOrAllocatedRMContainer =
+              application.getRMContainer(assignment.getAssignmentInformation()
+                  .getFirstAllocatedOrReservedContainerId());
+
+          // Book-keeping
+          // Note: Update headroom to account for current allocation too...
+          allocateResource(clusterResource, application, assigned,
+              node.getPartition(), reservedOrAllocatedRMContainer,
+              assignment.isIncreasedAllocation());
+
+          // Update reserved metrics
+          Resource reservedRes =
+              assignment.getAssignmentInformation().getReserved();
+          if (reservedRes != null && !reservedRes.equals(Resources.none())) {
+            incReservedResource(node.getPartition(), reservedRes);
+          }
 
-        // Done
-        return assignment;
-      } else if (assignment.getSkippedType()
-          == CSAssignment.SkippedType.OTHER) {
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-        application.updateNodeInfoForAMDiagnostics(node);
-      } else if(assignment.getSkippedType()
-          == CSAssignment.SkippedType.QUEUE_LIMIT) {
-        return assignment;
-      } else {
-        // If we don't allocate anything, and it is not skipped by application,
-        // we will return to respect FIFO of applications
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.RESPECT_FIFO);
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-        return CSAssignment.NULL_ASSIGNMENT;
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParent().getQueueName(), getQueueName(),
+              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+
+          // Done
+          return assignment;
+        } else if (assignment.getSkippedType()
+            == CSAssignment.SkippedType.OTHER) {
+          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+          application.updateNodeInfoForAMDiagnostics(node);
+        } else if (assignment.getSkippedType()
+            == CSAssignment.SkippedType.QUEUE_LIMIT) {
+          return assignment;
+        } else{
+          // If we don't allocate anything, and it is not skipped by 
application,
+          // we will return to respect FIFO of applications
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParent().getQueueName(), getQueueName(), 
ActivityState.SKIPPED,
+              ActivityDiagnosticConstant.RESPECT_FIFO);
+          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+              activitiesManager, application.getApplicationId(),
+              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+          return CSAssignment.NULL_ASSIGNMENT;
+        }
       }
-    }
-    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-        getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-        ActivityDiagnosticConstant.EMPTY);
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.EMPTY);
 
-    return CSAssignment.NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
@@ -1109,7 +1198,8 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  @Lock({LeafQueue.class, FiCaSchedulerApp.class})
+  // It doesn't necessarily to hold application's lock here.
+  @Lock({LeafQueue.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
       Resource clusterResource, String nodePartition,
       SchedulingMode schedulingMode) {
@@ -1281,51 +1371,53 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   @Private
-  protected synchronized boolean canAssignToUser(Resource clusterResource,
+  protected boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
       String nodePartition, ResourceLimits currentResourceLimits) {
-    User user = getUser(userName);
-
-    currentResourceLimits.setAmountNeededUnreserve(Resources.none());
-
-    // Note: We aren't considering the current request since there is a fixed
-    // overhead of the AM, but it's a > check, not a >= check, so...
-    if (Resources
-        .greaterThan(resourceCalculator, clusterResource,
-            user.getUsed(nodePartition),
-            limit)) {
-      // if enabled, check to see if could we potentially use this node instead
-      // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking &&
-          nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
-        if (Resources.lessThanOrEqual(
-            resourceCalculator,
-            clusterResource,
-            Resources.subtract(user.getUsed(),
-                application.getCurrentReservation()), limit)) {
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("User " + userName + " in queue " + getQueueName()
-                + " will exceed limit based on reservations - " + " consumed: "
-                + user.getUsed() + " reserved: "
-                + application.getCurrentReservation() + " limit: " + limit);
+    try {
+      readLock.lock();
+      User user = getUser(userName);
+
+      currentResourceLimits.setAmountNeededUnreserve(Resources.none());
+
+      // Note: We aren't considering the current request since there is a fixed
+      // overhead of the AM, but it's a > check, not a >= check, so...
+      if (Resources.greaterThan(resourceCalculator, clusterResource,
+          user.getUsed(nodePartition), limit)) {
+        // if enabled, check to see if could we potentially use this node 
instead
+        // of a reserved node if the application has reserved containers
+        if (this.reservationsContinueLooking && nodePartition.equals(
+            CommonNodeLabelsManager.NO_LABEL)) {
+          if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+              Resources.subtract(user.getUsed(),
+                  application.getCurrentReservation()), limit)) {
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("User " + userName + " in queue " + getQueueName()
+                  + " will exceed limit based on reservations - "
+                  + " consumed: " + user.getUsed() + " reserved: " + 
application
+                  .getCurrentReservation() + " limit: " + limit);
+            }
+            Resource amountNeededToUnreserve = Resources.subtract(
+                user.getUsed(nodePartition), limit);
+            // we can only acquire a new container if we unreserve first to
+            // respect user-limit
+            currentResourceLimits.setAmountNeededUnreserve(
+                amountNeededToUnreserve);
+            return true;
           }
-          Resource amountNeededToUnreserve =
-              Resources.subtract(user.getUsed(nodePartition), limit);
-          // we can only acquire a new container if we unreserve first to
-          // respect user-limit
-          
currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
-          return true;
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("User " + userName + " in queue " + getQueueName()
+              + " will exceed limit - " + " consumed: " + user
+              .getUsed(nodePartition) + " limit: " + limit);
+        }
+        return false;
       }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("User " + userName + " in queue " + getQueueName()
-            + " will exceed limit - " + " consumed: "
-            + user.getUsed(nodePartition) + " limit: " + limit);
-      }
-      return false;
+      return true;
+    } finally {
+      readLock.unlock();
     }
-    return true;
   }
   
   @Override
@@ -1333,15 +1425,15 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) {
     boolean removed = false;
     Priority priority = null;
-    
-    synchronized (this) {
+
+    try {
+      writeLock.lock();
       if (rmContainer.getContainer() != null) {
         priority = rmContainer.getContainer().getPriority();
       }
 
       if (null != priority) {
-        removed = app.unreserve(
-            rmContainer.getAllocatedSchedulerKey(), node,
+        removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node,
             rmContainer);
       }
 
@@ -1352,8 +1444,10 @@ public class LeafQueue extends AbstractCSQueue {
         releaseResource(clusterResource, app, 
rmContainer.getReservedResource(),
             node.getPartition(), rmContainer, true);
       }
+    } finally {
+      writeLock.unlock();
     }
-    
+
     if (removed) {
       getParent().unreserveIncreasedContainer(clusterResource, app, node,
           rmContainer);
@@ -1380,42 +1474,52 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  private synchronized float calculateUserUsageRatio(Resource clusterResource,
+  private float calculateUserUsageRatio(Resource clusterResource,
       String nodePartition) {
-    Resource resourceByLabel =
-        labelManager.getResourceByLabel(nodePartition, clusterResource);
-    float consumed = 0;
-    User user;
-    for (Map.Entry<String, User> entry : users.entrySet()) {
-      user = entry.getValue();
-      consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
-          resourceByLabel, nodePartition);
+    try {
+      writeLock.lock();
+      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+          clusterResource);
+      float consumed = 0;
+      User user;
+      for (Map.Entry<String, User> entry : users.entrySet()) {
+        user = entry.getValue();
+        consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
+            resourceByLabel, nodePartition);
+      }
+      return consumed;
+    } finally {
+      writeLock.unlock();
     }
-    return consumed;
   }
 
-  private synchronized void recalculateQueueUsageRatio(Resource 
clusterResource,
+  private void recalculateQueueUsageRatio(Resource clusterResource,
       String nodePartition) {
-    ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
-
-    if (nodePartition == null) {
-      for (String partition : 
Sets.union(queueCapacities.getNodePartitionsSet(),
-          queueResourceUsage.getNodePartitionsSet())) {
-        qUsageRatios.setUsageRatio(partition,
-            calculateUserUsageRatio(clusterResource, partition));
+    try {
+      writeLock.lock();
+      ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
+
+      if (nodePartition == null) {
+        for (String partition : Sets.union(
+            queueCapacities.getNodePartitionsSet(),
+            queueResourceUsage.getNodePartitionsSet())) {
+          qUsageRatios.setUsageRatio(partition,
+              calculateUserUsageRatio(clusterResource, partition));
+        }
+      } else{
+        qUsageRatios.setUsageRatio(nodePartition,
+            calculateUserUsageRatio(clusterResource, nodePartition));
       }
-    } else {
-      qUsageRatios.setUsageRatio(nodePartition,
-          calculateUserUsageRatio(clusterResource, nodePartition));
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  private synchronized void updateQueueUsageRatio(String nodePartition,
+  private void updateQueueUsageRatio(String nodePartition,
       float delta) {
     qUsageRatios.incUsageRatio(nodePartition, delta);
   }
 
-
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer 
rmContainer, 
@@ -1438,21 +1542,20 @@ public class LeafQueue extends AbstractCSQueue {
       boolean removed = false;
 
       // Careful! Locking order is important!
-      synchronized (this) {
-
+      try {
+        writeLock.lock();
         Container container = rmContainer.getContainer();
 
         // Inform the application & the node
         // Note: It's safe to assume that all state changes to RMContainer
-        // happen under scheduler's lock... 
+        // happen under scheduler's lock...
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
           removed = 
application.unreserve(rmContainer.getReservedSchedulerKey(),
               node, rmContainer);
-        } else {
-          removed =
-              application.containerCompleted(rmContainer, containerStatus,
-                  event, node.getPartition());
+        } else{
+          removed = application.containerCompleted(rmContainer, 
containerStatus,
+              event, node.getPartition());
 
           node.releaseContainer(container);
         }
@@ -1462,12 +1565,15 @@ public class LeafQueue extends AbstractCSQueue {
 
           // Inform the ordering policy
           orderingPolicy.containerReleased(application, rmContainer);
-          
+
           releaseResource(clusterResource, application, 
container.getResource(),
               node.getPartition(), rmContainer, false);
         }
+      } finally {
+        writeLock.unlock();
       }
 
+
       if (removed) {
         // Inform the parent queue _outside_ of the leaf-queue lock
         getParent().completedContainer(clusterResource, application, node,
@@ -1480,91 +1586,104 @@ public class LeafQueue extends AbstractCSQueue {
         new KillableContainer(rmContainer, node.getPartition(), queueName));
   }
 
-  synchronized void allocateResource(Resource clusterResource,
+  void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
       String nodePartition, RMContainer rmContainer,
       boolean isIncreasedAllocation) {
-    super.allocateResource(clusterResource, resource, nodePartition,
-        isIncreasedAllocation);
-    Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
-        clusterResource);
-    
-    // handle ignore exclusivity container
-    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-        RMNodeLabelsManager.NO_LABEL)
-        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      TreeSet<RMContainer> rmContainers = null;
-      if (null == (rmContainers =
-          ignorePartitionExclusivityRMContainers.get(nodePartition))) {
-        rmContainers = new TreeSet<>();
-        ignorePartitionExclusivityRMContainers.put(nodePartition, 
rmContainers);
+    try {
+      writeLock.lock();
+      super.allocateResource(clusterResource, resource, nodePartition,
+          isIncreasedAllocation);
+      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+          clusterResource);
+
+      // handle ignore exclusivity container
+      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+          RMNodeLabelsManager.NO_LABEL)) {
+        TreeSet<RMContainer> rmContainers = null;
+        if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
+            nodePartition))) {
+          rmContainers = new TreeSet<>();
+          ignorePartitionExclusivityRMContainers.put(nodePartition,
+              rmContainers);
+        }
+        rmContainers.add(rmContainer);
       }
-      rmContainers.add(rmContainer);
-    }
 
-    // Update user metrics
-    String userName = application.getUser();
-    User user = getUser(userName);
-    user.assignContainer(resource, nodePartition);
+      // Update user metrics
+      String userName = application.getUser();
 
-    // Update usage ratios
-    updateQueueUsageRatio(nodePartition,
-        user.updateUsageRatio(resourceCalculator, resourceByLabel,
-            nodePartition));
+      // TODO, should use getUser, use this method just to avoid UT failure
+      // which is caused by wrong invoking order, will fix UT separately
+      User user = getUserAndAddIfAbsent(userName);
 
-    // Note this is a bit unconventional since it gets the object and modifies
-    // it here, rather then using set routine
-    Resources.subtractFrom(application.getHeadroom(), resource); // headroom
-    metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getQueueName() +
-          " user=" + userName +
-          " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
-          " headroom = " + application.getHeadroom() +
-          " user-resources=" + user.getUsed()
-          );
+      user.assignContainer(resource, nodePartition);
+
+      // Update usage ratios
+      updateQueueUsageRatio(nodePartition,
+          user.updateUsageRatio(resourceCalculator, resourceByLabel,
+              nodePartition));
+
+      // Note this is a bit unconventional since it gets the object and 
modifies
+      // it here, rather then using set routine
+      Resources.subtractFrom(application.getHeadroom(), resource); // headroom
+      metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage
+            .getUsed() + " numContainers=" + numContainers + " headroom = "
+            + application.getHeadroom() + " user-resources=" + user.getUsed());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  synchronized void releaseResource(Resource clusterResource,
+  void releaseResource(Resource clusterResource,
       FiCaSchedulerApp application, Resource resource, String nodePartition,
       RMContainer rmContainer, boolean isChangeResource) {
-    super.releaseResource(clusterResource, resource, nodePartition,
-        isChangeResource);
-    Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
-        clusterResource);
-    
-    // handle ignore exclusivity container
-    if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-        RMNodeLabelsManager.NO_LABEL)
-        && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
-        Set<RMContainer> rmContainers =
-            ignorePartitionExclusivityRMContainers.get(nodePartition);
-        rmContainers.remove(rmContainer);
-        if (rmContainers.isEmpty()) {
-          ignorePartitionExclusivityRMContainers.remove(nodePartition);
+    try {
+      writeLock.lock();
+      super.releaseResource(clusterResource, resource, nodePartition,
+          isChangeResource);
+      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+          clusterResource);
+
+      // handle ignore exclusivity container
+      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+          RMNodeLabelsManager.NO_LABEL)) {
+        if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) 
{
+          Set<RMContainer> rmContainers =
+              ignorePartitionExclusivityRMContainers.get(nodePartition);
+          rmContainers.remove(rmContainer);
+          if (rmContainers.isEmpty()) {
+            ignorePartitionExclusivityRMContainers.remove(nodePartition);
+          }
         }
       }
-    }
 
-    // Update user metrics
-    String userName = application.getUser();
-    User user = getUser(userName);
-    user.releaseContainer(resource, nodePartition);
+      // Update user metrics
+      String userName = application.getUser();
+      User user = getUserAndAddIfAbsent(userName);
+      user.releaseContainer(resource, nodePartition);
 
-    // Update usage ratios
-    updateQueueUsageRatio(nodePartition,
-        user.updateUsageRatio(resourceCalculator, resourceByLabel,
-            nodePartition));
+      // Update usage ratios
+      updateQueueUsageRatio(nodePartition,
+          user.updateUsageRatio(resourceCalculator, resourceByLabel,
+              nodePartition));
 
-    metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
+      metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(getQueueName() +
-          " used=" + queueUsage.getUsed() + " numContainers=" + numContainers +
-          " user=" + userName + " user-resources=" + user.getUsed());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            getQueueName() + " used=" + queueUsage.getUsed() + " 
numContainers="
+                + numContainers + " user=" + userName + " user-resources="
+                + user.getUsed());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
@@ -1589,35 +1708,38 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource,
+  public void updateClusterResource(Resource clusterResource,
       ResourceLimits currentResourceLimits) {
-    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-    lastClusterResource = clusterResource;
-    
-    // Update headroom info based on new cluster resource value
-    // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
-    // during allocation
-    setQueueResourceLimitsInfo(clusterResource);
+    try {
+      writeLock.lock();
+      updateCurrentResourceLimits(currentResourceLimits, clusterResource);
+      lastClusterResource = clusterResource;
 
-    // Update user consumedRatios
-    recalculateQueueUsageRatio(clusterResource, null);
+      // Update headroom info based on new cluster resource value
+      // absoluteMaxCapacity now,  will be replaced with 
absoluteMaxAvailCapacity
+      // during allocation
+      setQueueResourceLimitsInfo(clusterResource);
 
-    // Update metrics
-    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
-        minimumAllocation, this, labelManager, null);
+      // Update user consumedRatios
+      recalculateQueueUsageRatio(clusterResource, null);
 
-    // queue metrics are updated, more resource may be available
-    // activate the pending applications if possible
-    activateApplications();
+      // Update metrics
+      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+          minimumAllocation, this, labelManager, null);
 
-    // Update application properties
-    for (FiCaSchedulerApp application :
-      orderingPolicy.getSchedulableEntities()) {
-      synchronized (application) {
+      // queue metrics are updated, more resource may be available
+      // activate the pending applications if possible
+      activateApplications();
+
+      // Update application properties
+      for (FiCaSchedulerApp application : orderingPolicy
+          .getSchedulableEntities()) {
         computeUserLimitAndSetHeadroom(application, clusterResource,
             RMNodeLabelsManager.NO_LABEL,
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -1714,30 +1836,47 @@ public class LeafQueue extends AbstractCSQueue {
   public static class User {
     ResourceUsage userResourceUsage = new ResourceUsage();
     volatile Resource userResourceLimit = Resource.newInstance(0, 0);
-    int pendingApplications = 0;
-    int activeApplications = 0;
+    volatile int pendingApplications = 0;
+    volatile int activeApplications = 0;
     private UsageRatios userUsageRatios = new UsageRatios();
+    private WriteLock writeLock;
+
+    User() {
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      // Nobody uses read-lock now, will add it when necessary
+      writeLock = lock.writeLock();
+    }
 
     public ResourceUsage getResourceUsage() {
       return userResourceUsage;
     }
     
-    public synchronized float resetAndUpdateUsageRatio(
+    public float resetAndUpdateUsageRatio(
         ResourceCalculator resourceCalculator,
         Resource resource, String nodePartition) {
-      userUsageRatios.setUsageRatio(nodePartition, 0);
-      return updateUsageRatio(resourceCalculator, resource, nodePartition);
+      try {
+        writeLock.lock();
+        userUsageRatios.setUsageRatio(nodePartition, 0);
+        return updateUsageRatio(resourceCalculator, resource, nodePartition);
+      } finally {
+        writeLock.unlock();
+      }
     }
 
-    public synchronized float updateUsageRatio(
+    public float updateUsageRatio(
         ResourceCalculator resourceCalculator,
         Resource resource, String nodePartition) {
-      float delta;
-      float newRatio =
-          Resources.ratio(resourceCalculator, getUsed(nodePartition), 
resource);
-      delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
-      userUsageRatios.setUsageRatio(nodePartition, newRatio);
-      return delta;
+      try {
+        writeLock.lock();
+        float delta;
+        float newRatio = Resources.ratio(resourceCalcula

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to