Repository: hadoop
Updated Branches:
  refs/heads/trunk e52d6e7a4 -> 2b66d9ec5


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 3e9785f..ffb6892 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -107,68 +107,77 @@ public class ParentQueue extends AbstractCSQueue {
         ", fullname=" + getQueuePath());
   }
 
-  synchronized void setupQueueConfigs(Resource clusterResource)
+  void setupQueueConfigs(Resource clusterResource)
       throws IOException {
-    super.setupQueueConfigs(clusterResource);
-    StringBuilder aclsString = new StringBuilder();
-    for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
-      aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
-    }
+    try {
+      writeLock.lock();
+      super.setupQueueConfigs(clusterResource);
+      StringBuilder aclsString = new StringBuilder();
+      for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
+        aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
+      }
 
-    StringBuilder labelStrBuilder = new StringBuilder(); 
-    if (accessibleLabels != null) {
-      for (String s : accessibleLabels) {
-        labelStrBuilder.append(s);
-        labelStrBuilder.append(",");
+      StringBuilder labelStrBuilder = new StringBuilder();
+      if (accessibleLabels != null) {
+        for (String s : accessibleLabels) {
+          labelStrBuilder.append(s);
+          labelStrBuilder.append(",");
+        }
       }
-    }
 
-    LOG.info(queueName +
-        ", capacity=" + this.queueCapacities.getCapacity() +
-        ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
-        ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
-        ", absoluteMaxCapacity=" + 
this.queueCapacities.getAbsoluteMaximumCapacity() +
-        ", state=" + state +
-        ", acls=" + aclsString + 
-        ", labels=" + labelStrBuilder.toString() + "\n" +
-        ", reservationsContinueLooking=" + reservationsContinueLooking);
+      LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
+          + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+          + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+          + ", absoluteMaxCapacity=" + this.queueCapacities
+          .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+          + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+          + ", reservationsContinueLooking=" + reservationsContinueLooking);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private static float PRECISION = 0.0005f; // 0.05% precision
-  synchronized void setChildQueues(Collection<CSQueue> childQueues) {
-    // Validate
-    float childCapacities = 0;
-    for (CSQueue queue : childQueues) {
-      childCapacities += queue.getCapacity();
-    }
-    float delta = Math.abs(1.0f - childCapacities);  // crude way to check
-    // allow capacities being set to 0, and enforce child 0 if parent is 0
-    if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || 
-        ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
-      throw new IllegalArgumentException("Illegal" +
-               " capacity of " + childCapacities + 
-               " for children of queue " + queueName);
-    }
-    // check label capacities
-    for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
-      float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
-      // check children's labels
-      float sum = 0;
+
+  void setChildQueues(Collection<CSQueue> childQueues) {
+    try {
+      writeLock.lock();
+      // Validate
+      float childCapacities = 0;
       for (CSQueue queue : childQueues) {
-        sum += queue.getQueueCapacities().getCapacity(nodeLabel);
+        childCapacities += queue.getCapacity();
       }
-      if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
-          || (capacityByLabel == 0) && (sum > 0)) {
-        throw new IllegalArgumentException("Illegal" + " capacity of "
-            + sum + " for children of queue " + queueName
-            + " for label=" + nodeLabel);
+      float delta = Math.abs(1.0f - childCapacities);  // crude way to check
+      // allow capacities being set to 0, and enforce child 0 if parent is 0
+      if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || (
+          (queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
+        throw new IllegalArgumentException(
+            "Illegal" + " capacity of " + childCapacities
+                + " for children of queue " + queueName);
       }
-    }
-    
-    this.childQueues.clear();
-    this.childQueues.addAll(childQueues);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("setChildQueues: " + getChildQueuesToPrint());
+      // check label capacities
+      for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
+        float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
+        // check children's labels
+        float sum = 0;
+        for (CSQueue queue : childQueues) {
+          sum += queue.getQueueCapacities().getCapacity(nodeLabel);
+        }
+        if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
+            || (capacityByLabel == 0) && (sum > 0)) {
+          throw new IllegalArgumentException(
+              "Illegal" + " capacity of " + sum + " for children of queue "
+                  + queueName + " for label=" + nodeLabel);
+        }
+      }
+
+      this.childQueues.clear();
+      this.childQueues.addAll(childQueues);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("setChildQueues: " + getChildQueuesToPrint());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
@@ -179,53 +188,70 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized QueueInfo getQueueInfo( 
+  public QueueInfo getQueueInfo(
       boolean includeChildQueues, boolean recursive) {
-    QueueInfo queueInfo = getQueueInfo();
-
-    List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
-    if (includeChildQueues) {
-      for (CSQueue child : childQueues) {
-        // Get queue information recursively?
-        childQueuesInfo.add(
-            child.getQueueInfo(recursive, recursive));
+    try {
+      readLock.lock();
+      QueueInfo queueInfo = getQueueInfo();
+
+      List<QueueInfo> childQueuesInfo = new ArrayList<>();
+      if (includeChildQueues) {
+        for (CSQueue child : childQueues) {
+          // Get queue information recursively?
+          childQueuesInfo.add(child.getQueueInfo(recursive, recursive));
+        }
       }
+      queueInfo.setChildQueues(childQueuesInfo);
+
+      return queueInfo;
+    } finally {
+      readLock.unlock();
     }
-    queueInfo.setChildQueues(childQueuesInfo);
-    
-    return queueInfo;
+
   }
 
-  private synchronized QueueUserACLInfo getUserAclInfo(
+  private QueueUserACLInfo getUserAclInfo(
       UserGroupInformation user) {
-    QueueUserACLInfo userAclInfo = 
-      recordFactory.newRecordInstance(QueueUserACLInfo.class);
-    List<QueueACL> operations = new ArrayList<QueueACL>();
-    for (QueueACL operation : QueueACL.values()) {
-      if (hasAccess(operation, user)) {
-        operations.add(operation);
-      } 
+    try {
+      readLock.lock();
+      QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
+          QueueUserACLInfo.class);
+      List<QueueACL> operations = new ArrayList<QueueACL>();
+      for (QueueACL operation : QueueACL.values()) {
+        if (hasAccess(operation, user)) {
+          operations.add(operation);
+        }
+      }
+
+      userAclInfo.setQueueName(getQueueName());
+      userAclInfo.setUserAcls(operations);
+      return userAclInfo;
+    } finally {
+      readLock.unlock();
     }
 
-    userAclInfo.setQueueName(getQueueName());
-    userAclInfo.setUserAcls(operations);
-    return userAclInfo;
   }
   
   @Override
-  public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
+  public List<QueueUserACLInfo> getQueueUserAclInfo(
       UserGroupInformation user) {
-    List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-    
-    // Add parent queue acls
-    userAcls.add(getUserAclInfo(user));
-    
-    // Add children queue acls
-    for (CSQueue child : childQueues) {
-      userAcls.addAll(child.getQueueUserAclInfo(user));
+    try {
+      readLock.lock();
+      List<QueueUserACLInfo> userAcls = new ArrayList<>();
+
+      // Add parent queue acls
+      userAcls.add(getUserAclInfo(user));
+
+      // Add children queue acls
+      for (CSQueue child : childQueues) {
+        userAcls.addAll(child.getQueueUserAclInfo(user));
+      }
+
+      return userAcls;
+    } finally {
+      readLock.unlock();
     }
- 
-    return userAcls;
+
   }
 
   public String toString() {
@@ -240,52 +266,59 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   @Override
-  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+  public void reinitialize(CSQueue newlyParsedQueue,
       Resource clusterResource) throws IOException {
-    // Sanity check
-    if (!(newlyParsedQueue instanceof ParentQueue) ||
-        !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
-      throw new IOException("Trying to reinitialize " + getQueuePath() +
-          " from " + newlyParsedQueue.getQueuePath());
-    }
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
 
-    ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
+      ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
 
-    // Set new configs
-    setupQueueConfigs(clusterResource);
+      // Set new configs
+      setupQueueConfigs(clusterResource);
 
-    // Re-configure existing child queues and add new ones
-    // The CS has already checked to ensure all existing child queues are 
present!
-    Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
-    Map<String, CSQueue> newChildQueues = 
-        getQueues(newlyParsedParentQueue.childQueues);
-    for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
-      String newChildQueueName = e.getKey();
-      CSQueue newChildQueue = e.getValue();
+      // Re-configure existing child queues and add new ones
+      // The CS has already checked to ensure all existing child queues are 
present!
+      Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
+      Map<String, CSQueue> newChildQueues = getQueues(
+          newlyParsedParentQueue.childQueues);
+      for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
+        String newChildQueueName = e.getKey();
+        CSQueue newChildQueue = e.getValue();
 
-      CSQueue childQueue = currentChildQueues.get(newChildQueueName);
-      
-      // Check if the child-queue already exists
-      if (childQueue != null) {
-        // Re-init existing child queues
-        childQueue.reinitialize(newChildQueue, clusterResource);
-        LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
-      } else {
-        // New child queue, do not re-init
-        
-        // Set parent to 'this'
-        newChildQueue.setParent(this);
-        
-        // Save in list of current child queues
-        currentChildQueues.put(newChildQueueName, newChildQueue);
-        
-        LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
+        CSQueue childQueue = currentChildQueues.get(newChildQueueName);
+
+        // Check if the child-queue already exists
+        if (childQueue != null) {
+          // Re-init existing child queues
+          childQueue.reinitialize(newChildQueue, clusterResource);
+          LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
+        } else{
+          // New child queue, do not re-init
+
+          // Set parent to 'this'
+          newChildQueue.setParent(this);
+
+          // Save in list of current child queues
+          currentChildQueues.put(newChildQueueName, newChildQueue);
+
+          LOG.info(
+              getQueueName() + ": added new child queue: " + newChildQueue);
+        }
       }
-    }
 
-    // Re-sort all queues
-    childQueues.clear();
-    childQueues.addAll(currentChildQueues.values());
+      // Re-sort all queues
+      childQueues.clear();
+      childQueues.addAll(currentChildQueues.values());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
@@ -299,21 +332,24 @@ public class ParentQueue extends AbstractCSQueue {
   @Override
   public void submitApplication(ApplicationId applicationId, String user,
       String queue) throws AccessControlException {
-    
-    synchronized (this) {
+
+    try {
+      writeLock.lock();
       // Sanity check
       if (queue.equals(queueName)) {
-        throw new AccessControlException("Cannot submit application " +
-            "to non-leaf queue: " + queueName);
+        throw new AccessControlException(
+            "Cannot submit application " + "to non-leaf queue: " + queueName);
       }
-      
+
       if (state != QueueState.RUNNING) {
-        throw new AccessControlException("Queue " + getQueuePath() +
-            " is STOPPED. Cannot accept submission of application: " +
-            applicationId);
+        throw new AccessControlException("Queue " + getQueuePath()
+            + " is STOPPED. Cannot accept submission of application: "
+            + applicationId);
       }
 
       addApplication(applicationId, user);
+    } finally {
+      writeLock.unlock();
     }
     
     // Inform the parent queue
@@ -342,24 +378,26 @@ public class ParentQueue extends AbstractCSQueue {
     // finish attempt logic.
   }
 
-  private synchronized void addApplication(ApplicationId applicationId,
+  private void addApplication(ApplicationId applicationId,
       String user) {
 
-    ++numApplications;
+    try {
+      writeLock.lock();
+      ++numApplications;
 
-    LOG.info("Application added -" +
-        " appId: " + applicationId + 
-        " user: " + user + 
-        " leaf-queue of parent: " + getQueueName() + 
-        " #applications: " + getNumApplications());
+      LOG.info(
+          "Application added -" + " appId: " + applicationId + " user: " + user
+              + " leaf-queue of parent: " + getQueueName() + " #applications: "
+              + getNumApplications());
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   @Override
   public void finishApplication(ApplicationId application, String user) {
-    
-    synchronized (this) {
-      removeApplication(application, user);
-    }
+
+    removeApplication(application, user);
     
     // Inform the parent queue
     if (parent != null) {
@@ -367,16 +405,18 @@ public class ParentQueue extends AbstractCSQueue {
     }
   }
 
-  private synchronized void removeApplication(ApplicationId applicationId, 
+  private void removeApplication(ApplicationId applicationId,
       String user) {
-    
-    --numApplications;
-
-    LOG.info("Application removed -" +
-        " appId: " + applicationId + 
-        " user: " + user + 
-        " leaf-queue of parent: " + getQueueName() + 
-        " #applications: " + getNumApplications());
+    try {
+      writeLock.lock();
+      --numApplications;
+
+      LOG.info("Application removed -" + " appId: " + applicationId + " user: "
+          + user + " leaf-queue of parent: " + getQueueName()
+          + " #applications: " + getNumApplications());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private String getParentName() {
@@ -384,183 +424,181 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized CSAssignment assignContainers(Resource clusterResource,
+  public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits resourceLimits,
       SchedulingMode schedulingMode) {
-    // if our queue cannot access this node, just return
-    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !accessibleToPartition(node.getPartition())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it is not able to access partition=" + node
-            .getPartition());
-      }
-
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          getParentName(), getQueueName(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+    try {
+      writeLock.lock();
+      // if our queue cannot access this node, just return
+      if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+          && !accessibleToPartition(node.getPartition())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip this queue=" + getQueuePath()
+              + ", because it is not able to access partition=" + node
               .getPartition());
-      if (rootQueue) {
-        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-            node);
-      }
+        }
 
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-    
-    // Check if this queue need more resource, simply skip allocation if this
-    // queue doesn't need more resources.
-    if (!super.hasPendingResourceRequest(node.getPartition(),
-        clusterResource, schedulingMode)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + 
node.getPartition());
-      }
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParentName(), getQueueName(), ActivityState.REJECTED,
+            ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
+                .getPartition());
+        if (rootQueue) {
+          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+              node);
+        }
 
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          getParentName(), getQueueName(), ActivityState.SKIPPED,
-          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
-      if (rootQueue) {
-        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-            node);
+        return CSAssignment.NULL_ASSIGNMENT;
       }
 
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-    
-    CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-    
-    while (canAssign(clusterResource, node)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to assign containers to child-queue of "
-          + getQueueName());
-      }
-      
-      // Are we over maximum-capacity for this queue?
-      // This will also consider parent's limits and also continuous 
reservation
-      // looking
-      if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-          resourceLimits, Resources.createResource(
-              getMetrics().getReservedMB(), getMetrics()
-                  .getReservedVirtualCores()), schedulingMode)) {
+      // Check if this queue need more resource, simply skip allocation if this
+      // queue doesn't need more resources.
+      if (!super.hasPendingResourceRequest(node.getPartition(), 
clusterResource,
+          schedulingMode)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip this queue=" + getQueuePath()
+              + ", because it doesn't need more resource, schedulingMode="
+              + schedulingMode.name() + " node-partition=" + node
+              .getPartition());
+        }
 
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
             getParentName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
+            ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
         if (rootQueue) {
           ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
               node);
         }
 
-        break;
+        return CSAssignment.NULL_ASSIGNMENT;
       }
 
-      // Schedule
-      CSAssignment assignedToChild =
-          assignContainersToChildQueues(clusterResource, node, resourceLimits,
-              schedulingMode);
-      assignment.setType(assignedToChild.getType());
-      
-      // Done if no child-queue assigned anything
-      if (Resources.greaterThan(
-              resourceCalculator, clusterResource, 
-              assignedToChild.getResource(), Resources.none())) {
+      CSAssignment assignment = new CSAssignment(Resources.createResource(0, 
0),
+          NodeType.NODE_LOCAL);
 
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParentName(), getQueueName(), ActivityState.ACCEPTED,
-            ActivityDiagnosticConstant.EMPTY);
+      while (canAssign(clusterResource, node)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Trying to assign containers to child-queue of "
+              + getQueueName());
+        }
 
-        if (node.getReservedContainer() == null) {
-          if (rootQueue) {
-            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
-                activitiesManager, node,
-                assignedToChild.getAssignmentInformation()
-                    .getFirstAllocatedOrReservedContainerId(),
-                AllocationState.ALLOCATED);
-          }
-        } else {
+        // Are we over maximum-capacity for this queue?
+        // This will also consider parent's limits and also continuous 
reservation
+        // looking
+        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+            resourceLimits, Resources
+                .createResource(getMetrics().getReservedMB(),
+                    getMetrics().getReservedVirtualCores()), schedulingMode)) {
+
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParentName(), getQueueName(), ActivityState.SKIPPED,
+              ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
           if (rootQueue) {
-            ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
-                activitiesManager, node,
-                assignedToChild.getAssignmentInformation()
-                    .getFirstAllocatedOrReservedContainerId(),
-                AllocationState.RESERVED);
+            
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+                node);
           }
+
+          break;
         }
 
-        // Track resource utilization for the parent-queue
-        allocateResource(clusterResource, assignedToChild.getResource(),
-            node.getPartition(), assignedToChild.isIncreasedAllocation());
-        
-        // Track resource utilization in this pass of the scheduler
-        Resources
-          .addTo(assignment.getResource(), assignedToChild.getResource());
-        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-          assignedToChild.getAssignmentInformation().getAllocated());
-        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-            assignedToChild.getAssignmentInformation().getReserved());
-        assignment.getAssignmentInformation().incrAllocations(
-          assignedToChild.getAssignmentInformation().getNumAllocations());
-        assignment.getAssignmentInformation().incrReservations(
-          assignedToChild.getAssignmentInformation().getNumReservations());
-        assignment
-          .getAssignmentInformation()
-          .getAllocationDetails()
-          .addAll(
-              
assignedToChild.getAssignmentInformation().getAllocationDetails());
-        assignment
-          .getAssignmentInformation()
-          .getReservationDetails()
-          .addAll(
+        // Schedule
+        CSAssignment assignedToChild = assignContainersToChildQueues(
+            clusterResource, node, resourceLimits, schedulingMode);
+        assignment.setType(assignedToChild.getType());
+
+        // Done if no child-queue assigned anything
+        if (Resources.greaterThan(resourceCalculator, clusterResource,
+            assignedToChild.getResource(), Resources.none())) {
+
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParentName(), getQueueName(), ActivityState.ACCEPTED,
+              ActivityDiagnosticConstant.EMPTY);
+
+          if (node.getReservedContainer() == null) {
+            if (rootQueue) {
+              ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                  activitiesManager, node,
+                  assignedToChild.getAssignmentInformation()
+                      .getFirstAllocatedOrReservedContainerId(),
+                  AllocationState.ALLOCATED);
+            }
+          } else{
+            if (rootQueue) {
+              ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
+                  activitiesManager, node,
+                  assignedToChild.getAssignmentInformation()
+                      .getFirstAllocatedOrReservedContainerId(),
+                  AllocationState.RESERVED);
+            }
+          }
+
+          // Track resource utilization for the parent-queue
+          allocateResource(clusterResource, assignedToChild.getResource(),
+              node.getPartition(), assignedToChild.isIncreasedAllocation());
+
+          // Track resource utilization in this pass of the scheduler
+          Resources.addTo(assignment.getResource(),
+              assignedToChild.getResource());
+          Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+              assignedToChild.getAssignmentInformation().getAllocated());
+          Resources.addTo(assignment.getAssignmentInformation().getReserved(),
+              assignedToChild.getAssignmentInformation().getReserved());
+          assignment.getAssignmentInformation().incrAllocations(
+              assignedToChild.getAssignmentInformation().getNumAllocations());
+          assignment.getAssignmentInformation().incrReservations(
+              assignedToChild.getAssignmentInformation().getNumReservations());
+          assignment.getAssignmentInformation().getAllocationDetails().addAll(
+              assignedToChild.getAssignmentInformation()
+                  .getAllocationDetails());
+          assignment.getAssignmentInformation().getReservationDetails().addAll(
               assignedToChild.getAssignmentInformation()
                   .getReservationDetails());
-        assignment.setIncreasedAllocation(assignedToChild
-            .isIncreasedAllocation());
-        
-        LOG.info("assignedContainer" +
-            " queue=" + getQueueName() + 
-            " usedCapacity=" + getUsedCapacity() +
-            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
-            " used=" + queueUsage.getUsed() + 
-            " cluster=" + clusterResource);
-
-      } else {
-        assignment.setSkippedType(assignedToChild.getSkippedType());
+          assignment.setIncreasedAllocation(
+              assignedToChild.isIncreasedAllocation());
 
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParentName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.EMPTY);
-        if (rootQueue) {
-          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
-              node);
-        }
+          LOG.info("assignedContainer" + " queue=" + getQueueName()
+              + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+              + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
+              + " cluster=" + clusterResource);
 
-        break;
-      }
+        } else{
+          assignment.setSkippedType(assignedToChild.getSkippedType());
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("ParentQ=" + getQueueName()
-          + " assignedSoFarInThisIteration=" + assignment.getResource()
-          + " usedCapacity=" + getUsedCapacity()
-          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
-      }
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+              getParentName(), getQueueName(), ActivityState.SKIPPED,
+              ActivityDiagnosticConstant.EMPTY);
+          if (rootQueue) {
+            
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
+                node);
+          }
+
+          break;
+        }
 
-      // Do not assign more than one container if this isn't the root queue
-      // or if we've already assigned an off-switch container
-      if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
         if (LOG.isDebugEnabled()) {
-          if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
-            LOG.debug("Not assigning more than one off-switch container," +
-                " assignments so far: " + assignment);
+          LOG.debug(
+              "ParentQ=" + getQueueName() + " assignedSoFarInThisIteration="
+                  + assignment.getResource() + " usedCapacity="
+                  + getUsedCapacity() + " absoluteUsedCapacity="
+                  + getAbsoluteUsedCapacity());
+        }
+
+        // Do not assign more than one container if this isn't the root queue
+        // or if we've already assigned an off-switch container
+        if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
+          if (LOG.isDebugEnabled()) {
+            if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
+              LOG.debug("Not assigning more than one off-switch container,"
+                  + " assignments so far: " + assignment);
+            }
           }
+          break;
         }
-        break;
       }
-    } 
-    
-    return assignment;
+
+      return assignment;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
@@ -628,7 +666,7 @@ public class ParentQueue extends AbstractCSQueue {
     return childrenList.iterator();
   }
   
-  private synchronized CSAssignment assignContainersToChildQueues(
+  private CSAssignment assignContainersToChildQueues(
       Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
       SchedulingMode schedulingMode) {
     CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
@@ -717,39 +755,45 @@ public class ParentQueue extends AbstractCSQueue {
     }
   }
   
-  private synchronized void internalReleaseResource(Resource clusterResource,
+  private void internalReleaseResource(Resource clusterResource,
       FiCaSchedulerNode node, Resource releasedResource, boolean 
changeResource,
       CSQueue completedChildQueue, boolean sortQueues) {
-    super.releaseResource(clusterResource,
-        releasedResource, node.getPartition(),
-        changeResource);
+    try {
+      writeLock.lock();
+      super.releaseResource(clusterResource, releasedResource,
+          node.getPartition(), changeResource);
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
-    }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "completedContainer " + this + ", cluster=" + clusterResource);
+      }
 
-    // Note that this is using an iterator on the childQueues so this can't
-    // be called if already within an iterator for the childQueues. Like
-    // from assignContainersToChildQueues.
-    if (sortQueues) {
-      // reinsert the updated queue
-      for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
-        CSQueue csqueue = iter.next();
-        if (csqueue.equals(completedChildQueue)) {
-          iter.remove();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Re-sorting completed queue: " + csqueue);
+      // Note that this is using an iterator on the childQueues so this can't
+      // be called if already within an iterator for the childQueues. Like
+      // from assignContainersToChildQueues.
+      if (sortQueues) {
+        // reinsert the updated queue
+        for (Iterator<CSQueue> iter = childQueues.iterator();
+             iter.hasNext(); ) {
+          CSQueue csqueue = iter.next();
+          if (csqueue.equals(completedChildQueue)) {
+            iter.remove();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Re-sorting completed queue: " + csqueue);
+            }
+            childQueues.add(csqueue);
+            break;
           }
-          childQueues.add(csqueue);
-          break;
         }
       }
-    }
 
-    // If we skipped sort queue this time, we need to resort queues to make
-    // sure we allocate from least usage (or order defined by queue policy)
-    // queues.
-    needToResortQueuesAtNextAllocation = !sortQueues;
+      // If we skipped sort queue this time, we need to resort queues to make
+      // sure we allocate from least usage (or order defined by queue policy)
+      // queues.
+      needToResortQueuesAtNextAllocation = !sortQueues;
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   @Override
@@ -806,24 +850,35 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void updateClusterResource(Resource clusterResource,
+  public void updateClusterResource(Resource clusterResource,
       ResourceLimits resourceLimits) {
-    // Update all children
-    for (CSQueue childQueue : childQueues) {
-      // Get ResourceLimits of child queue before assign containers
-      ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
-          clusterResource, resourceLimits.getLimit(),
-          RMNodeLabelsManager.NO_LABEL);
-      childQueue.updateClusterResource(clusterResource, childLimits);
+    try {
+      writeLock.lock();
+      // Update all children
+      for (CSQueue childQueue : childQueues) {
+        // Get ResourceLimits of child queue before assign containers
+        ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
+            clusterResource, resourceLimits.getLimit(),
+            RMNodeLabelsManager.NO_LABEL);
+        childQueue.updateClusterResource(clusterResource, childLimits);
+      }
+
+      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+          minimumAllocation, this, labelManager, null);
+    } finally {
+      writeLock.unlock();
     }
-    
-    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
-        minimumAllocation, this, labelManager, null);
   }
   
   @Override
-  public synchronized List<CSQueue> getChildQueues() {
-    return new ArrayList<CSQueue>(childQueues);
+  public List<CSQueue> getChildQueues() {
+    try {
+      readLock.lock();
+      return new ArrayList<CSQueue>(childQueues);
+    } finally {
+      readLock.unlock();
+    }
+
   }
   
   @Override
@@ -832,13 +887,18 @@ public class ParentQueue extends AbstractCSQueue {
     if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
       return;
     }
-    // Careful! Locking order is important! 
-    synchronized (this) {
-      FiCaSchedulerNode node =
-          scheduler.getNode(rmContainer.getContainer().getNodeId());
+
+    // Careful! Locking order is important!
+    try {
+      writeLock.lock();
+      FiCaSchedulerNode node = scheduler.getNode(
+          rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource,
           rmContainer.getContainer().getResource(), node.getPartition(), 
false);
+    } finally {
+      writeLock.unlock();
     }
+
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
     }
@@ -851,11 +911,17 @@ public class ParentQueue extends AbstractCSQueue {
   }
 
   @Override
-  public synchronized void collectSchedulerApplications(
+  public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
-    for (CSQueue queue : childQueues) {
-      queue.collectSchedulerApplications(apps);
+    try {
+      readLock.lock();
+      for (CSQueue queue : childQueues) {
+        queue.collectSchedulerApplications(apps);
+      }
+    } finally {
+      readLock.unlock();
     }
+
   }
 
   @Override
@@ -897,44 +963,49 @@ public class ParentQueue extends AbstractCSQueue {
     }
   }
   
-  public synchronized int getNumApplications() {
+  public int getNumApplications() {
     return numApplications;
   }
 
-  synchronized void allocateResource(Resource clusterResource,
+  void allocateResource(Resource clusterResource,
       Resource resource, String nodePartition, boolean 
changeContainerResource) {
-    super.allocateResource(clusterResource, resource, nodePartition,
-        changeContainerResource);
-
-    /**
-     * check if we need to kill (killable) containers if maximum resource 
violated.
-     * Doing this because we will deduct killable resource when going from 
root.
-     * For example:
-     * <pre>
-     *      Root
-     *      /   \
-     *     a     b
-     *   /  \
-     *  a1  a2
-     * </pre>
-     *
-     * a: max=10G, used=10G, killable=2G
-     * a1: used=8G, killable=2G
-     * a2: used=2G, pending=2G, killable=0G
-     *
-     * When we get queue-a to allocate resource, even if queue-a
-     * reaches its max resource, we deduct its used by killable, so we can 
allocate
-     * at most 2G resources. ResourceLimits passed down to a2 has headroom set 
to 2G.
-     *
-     * If scheduler finds a 2G available resource in existing cluster, and 
assigns it
-     * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
-     *
-     * When this happens, we have to preempt killable container (on same or 
different
-     * nodes) of parent queue to avoid violating parent's max resource.
-     */
-    if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
-        < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
-      killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
+    try {
+      writeLock.lock();
+      super.allocateResource(clusterResource, resource, nodePartition,
+          changeContainerResource);
+
+      /**
+       * check if we need to kill (killable) containers if maximum resource 
violated.
+       * Doing this because we will deduct killable resource when going from 
root.
+       * For example:
+       * <pre>
+       *      Root
+       *      /   \
+       *     a     b
+       *   /  \
+       *  a1  a2
+       * </pre>
+       *
+       * a: max=10G, used=10G, killable=2G
+       * a1: used=8G, killable=2G
+       * a2: used=2G, pending=2G, killable=0G
+       *
+       * When we get queue-a to allocate resource, even if queue-a
+       * reaches its max resource, we deduct its used by killable, so we can 
allocate
+       * at most 2G resources. ResourceLimits passed down to a2 has headroom 
set to 2G.
+       *
+       * If scheduler finds a 2G available resource in existing cluster, and 
assigns it
+       * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
+       *
+       * When this happens, we have to preempt killable container (on same or 
different
+       * nodes) of parent queue to avoid violating parent's max resource.
+       */
+      if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
+          < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
+        killContainersToEnforceMaxQueueCapacity(nodePartition, 
clusterResource);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 7b53ad5..a391f25 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -79,76 +79,98 @@ public class PlanQueue extends ParentQueue {
   }
 
   @Override
-  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+  public void reinitialize(CSQueue newlyParsedQueue,
       Resource clusterResource) throws IOException {
-    // Sanity check
-    if (!(newlyParsedQueue instanceof PlanQueue)
-        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
-      throw new IOException("Trying to reinitialize " + getQueuePath()
-          + " from " + newlyParsedQueue.getQueuePath());
-    }
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
 
-    PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+      PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
 
-    if (newlyParsedParentQueue.getChildQueues().size() > 0) {
-      throw new IOException(
-          "Reservable Queue should not have sub-queues in the"
-              + "configuration");
-    }
+      if (newlyParsedParentQueue.getChildQueues().size() > 0) {
+        throw new IOException(
+            "Reservable Queue should not have sub-queues in the"
+                + "configuration");
+      }
 
-    // Set new configs
-    setupQueueConfigs(clusterResource);
+      // Set new configs
+      setupQueueConfigs(clusterResource);
 
-    updateQuotas(newlyParsedParentQueue.userLimit,
-        newlyParsedParentQueue.userLimitFactor,
-        newlyParsedParentQueue.maxAppsForReservation,
-        newlyParsedParentQueue.maxAppsPerUserForReservation);
+      updateQuotas(newlyParsedParentQueue.userLimit,
+          newlyParsedParentQueue.userLimitFactor,
+          newlyParsedParentQueue.maxAppsForReservation,
+          newlyParsedParentQueue.maxAppsPerUserForReservation);
 
-    // run reinitialize on each existing queue, to trigger absolute cap
-    // recomputations
-    for (CSQueue res : this.getChildQueues()) {
-      res.reinitialize(res, clusterResource);
+      // run reinitialize on each existing queue, to trigger absolute cap
+      // recomputations
+      for (CSQueue res : this.getChildQueues()) {
+        res.reinitialize(res, clusterResource);
+      }
+      showReservationsAsQueues =
+          newlyParsedParentQueue.showReservationsAsQueues;
+    } finally {
+      writeLock.unlock();
     }
-    showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
   }
 
-  synchronized void addChildQueue(CSQueue newQueue)
+  void addChildQueue(CSQueue newQueue)
       throws SchedulerDynamicEditException {
-    if (newQueue.getCapacity() > 0) {
-      throw new SchedulerDynamicEditException("Queue " + newQueue
-          + " being added has non zero capacity.");
-    }
-    boolean added = this.childQueues.add(newQueue);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("updateChildQueues (action: add queue): " + added + " "
-          + getChildQueuesToPrint());
+    try {
+      writeLock.lock();
+      if (newQueue.getCapacity() > 0) {
+        throw new SchedulerDynamicEditException(
+            "Queue " + newQueue + " being added has non zero capacity.");
+      }
+      boolean added = this.childQueues.add(newQueue);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("updateChildQueues (action: add queue): " + added + " "
+            + getChildQueuesToPrint());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  synchronized void removeChildQueue(CSQueue remQueue)
+  void removeChildQueue(CSQueue remQueue)
       throws SchedulerDynamicEditException {
-    if (remQueue.getCapacity() > 0) {
-      throw new SchedulerDynamicEditException("Queue " + remQueue
-          + " being removed has non zero capacity.");
-    }
-    Iterator<CSQueue> qiter = childQueues.iterator();
-    while (qiter.hasNext()) {
-      CSQueue cs = qiter.next();
-      if (cs.equals(remQueue)) {
-        qiter.remove();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Removed child queue: {}", cs.getQueueName());
+    try {
+      writeLock.lock();
+      if (remQueue.getCapacity() > 0) {
+        throw new SchedulerDynamicEditException(
+            "Queue " + remQueue + " being removed has non zero capacity.");
+      }
+      Iterator<CSQueue> qiter = childQueues.iterator();
+      while (qiter.hasNext()) {
+        CSQueue cs = qiter.next();
+        if (cs.equals(remQueue)) {
+          qiter.remove();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Removed child queue: {}", cs.getQueueName());
+          }
         }
       }
+    } finally {
+      writeLock.unlock();
     }
   }
 
-  protected synchronized float sumOfChildCapacities() {
-    float ret = 0;
-    for (CSQueue l : childQueues) {
-      ret += l.getCapacity();
+  protected float sumOfChildCapacities() {
+    try {
+      writeLock.lock();
+      float ret = 0;
+      for (CSQueue l : childQueues) {
+        ret += l.getCapacity();
+      }
+      return ret;
+    } finally {
+      writeLock.unlock();
     }
-    return ret;
   }
 
   private void updateQuotas(int userLimit, float userLimitFactor,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 976cf8c..faeb37e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -51,22 +51,28 @@ public class ReservationQueue extends LeafQueue {
   }
 
   @Override
-  public synchronized void reinitialize(CSQueue newlyParsedQueue,
+  public void reinitialize(CSQueue newlyParsedQueue,
       Resource clusterResource) throws IOException {
-    // Sanity check
-    if (!(newlyParsedQueue instanceof ReservationQueue)
-        || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
-      throw new IOException("Trying to reinitialize " + getQueuePath()
-          + " from " + newlyParsedQueue.getQueuePath());
-    }
-    super.reinitialize(newlyParsedQueue, clusterResource);
-    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
-        minimumAllocation, this, labelManager, null);
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
+      super.reinitialize(newlyParsedQueue, clusterResource);
+      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+          minimumAllocation, this, labelManager, null);
 
-    updateQuotas(parent.getUserLimitForReservation(),
-        parent.getUserLimitFactor(),
-        parent.getMaxApplicationsForReservations(),
-        parent.getMaxApplicationsPerUserForReservation());
+      updateQuotas(parent.getUserLimitForReservation(),
+          parent.getUserLimitFactor(),
+          parent.getMaxApplicationsForReservations(),
+          parent.getMaxApplicationsPerUserForReservation());
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   /**
@@ -77,21 +83,26 @@ public class ReservationQueue extends LeafQueue {
    *          maxCapacity, etc..)
    * @throws SchedulerDynamicEditException
    */
-  public synchronized void setEntitlement(QueueEntitlement entitlement)
+  public void setEntitlement(QueueEntitlement entitlement)
       throws SchedulerDynamicEditException {
-    float capacity = entitlement.getCapacity();
-    if (capacity < 0 || capacity > 1.0f) {
-      throw new SchedulerDynamicEditException(
-          "Capacity demand is not in the [0,1] range: " + capacity);
-    }
-    setCapacity(capacity);
-    setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
-    // note: we currently set maxCapacity to capacity
-    // this might be revised later
-    setMaxCapacity(entitlement.getMaxCapacity());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("successfully changed to " + capacity + " for queue "
-          + this.getQueueName());
+    try {
+      writeLock.lock();
+      float capacity = entitlement.getCapacity();
+      if (capacity < 0 || capacity > 1.0f) {
+        throw new SchedulerDynamicEditException(
+            "Capacity demand is not in the [0,1] range: " + capacity);
+      }
+      setCapacity(capacity);
+      setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
+      // note: we currently set maxCapacity to capacity
+      // this might be revised later
+      setMaxCapacity(entitlement.getMaxCapacity());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("successfully changed to " + capacity + " for queue " + this
+            .getQueueName());
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 6fba22a..2614630 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -828,8 +828,8 @@ public class TestContainerResizing {
         app.getAppAttemptResourceUsage().getPending().getMemorySize());
     // Queue/user/application's usage will be updated
     checkUsedResource(rm1, "default", 0 * GB, null);
-    Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
-        .getUser("user").getUsed().getMemorySize());
+    // User will be removed
+    Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user"));
     Assert.assertEquals(0 * GB,
         app.getAppAttemptResourceUsage().getReserved().getMemorySize());
     Assert.assertEquals(0 * GB,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to