http://git-wip-us.apache.org/repos/asf/hadoop/blob/3acd30df/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 843149f..1c00fc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -92,11 +93,11 @@ public class LeafQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(LeafQueue.class); private float absoluteUsedCapacity = 0.0f; - private int userLimit; - private float userLimitFactor; + private volatile int userLimit; + private volatile float userLimitFactor; protected int maxApplications; - protected int maxApplicationsPerUser; + protected volatile int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; @@ -104,15 +105,15 @@ public class LeafQueue extends AbstractCSQueue { private volatile boolean rackLocalityFullReset; Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = - new HashMap<ApplicationAttemptId, FiCaSchedulerApp>(); + new ConcurrentHashMap<>(); private Priority defaultAppPriorityPerQueue; - private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null; + private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy; private volatile float minimumAllocationFactor; - private Map<String, User> users = new HashMap<String, User>(); + private Map<String, User> users = new ConcurrentHashMap<>(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -129,7 +130,7 @@ public class LeafQueue extends AbstractCSQueue { private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; - private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; + private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; // Summation of consumed ratios for all users in queue private float totalUserConsumedRatio = 0; @@ -138,7 +139,7 @@ public class LeafQueue extends AbstractCSQueue { // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = - new HashMap<>(); + new ConcurrentHashMap<>(); @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, @@ -161,125 +162,125 @@ public class LeafQueue extends AbstractCSQueue { setupQueueConfigs(cs.getClusterResource()); } - protected synchronized void setupQueueConfigs(Resource clusterResource) + protected void setupQueueConfigs(Resource clusterResource) throws IOException { - super.setupQueueConfigs(clusterResource); - - this.lastClusterResource = clusterResource; - - this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource); - - // Initialize headroom info, also used for calculating application - // master resource limits. Since this happens during queue initialization - // and all queues may not be realized yet, we'll use (optimistic) - // absoluteMaxCapacity (it will be replaced with the more accurate - // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - setQueueResourceLimitsInfo(clusterResource); + try { + writeLock.lock(); + super.setupQueueConfigs(clusterResource); - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + this.lastClusterResource = clusterResource; - setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); + this.cachedResourceLimitsForHeadroom = new ResourceLimits( + clusterResource); - userLimit = conf.getUserLimit(getQueuePath()); - userLimitFactor = conf.getUserLimitFactor(getQueuePath()); + // Initialize headroom info, also used for calculating application + // master resource limits. Since this happens during queue initialization + // and all queues may not be realized yet, we'll use (optimistic) + // absoluteMaxCapacity (it will be replaced with the more accurate + // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) + setQueueResourceLimitsInfo(clusterResource); - maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); - if (maxApplications < 0) { - int maxSystemApps = conf.getMaximumSystemApplications(); - maxApplications = - (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); - } - maxApplicationsPerUser = Math.min(maxApplications, - (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor)); - - maxAMResourcePerQueuePercent = - conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); - - if (!SchedulerUtils.checkQueueLabelExpression( - this.accessibleLabels, this.defaultLabelExpression, null)) { - throw new IOException("Invalid default label expression of " - + " queue=" - + getQueueName() - + " doesn't have permission to access all labels " - + "in default label expression. labelExpression of resource request=" - + (this.defaultLabelExpression == null ? "" - : this.defaultLabelExpression) - + ". Queue labels=" - + (getAccessibleNodeLabels() == null ? "" : StringUtils.join( - getAccessibleNodeLabels().iterator(), ','))); - } - - nodeLocalityDelay = conf.getNodeLocalityDelay(); - rackLocalityFullReset = conf.getRackLocalityFullReset(); + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - // re-init this since max allocation could have changed - this.minimumAllocationFactor = - Resources.ratio(resourceCalculator, - Resources.subtract(maximumAllocation, minimumAllocation), - maximumAllocation); + setOrderingPolicy( + conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); - StringBuilder aclsString = new StringBuilder(); - for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) { - aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); - } + userLimit = conf.getUserLimit(getQueuePath()); + userLimitFactor = conf.getUserLimitFactor(getQueuePath()); - StringBuilder labelStrBuilder = new StringBuilder(); - if (accessibleLabels != null) { - for (String s : accessibleLabels) { - labelStrBuilder.append(s); - labelStrBuilder.append(","); + maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); + if (maxApplications < 0) { + int maxSystemApps = conf.getMaximumSystemApplications(); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } - } + maxApplicationsPerUser = Math.min(maxApplications, + (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); + + maxAMResourcePerQueuePercent = + conf.getMaximumApplicationMasterResourcePerQueuePercent( + getQueuePath()); + + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, + this.defaultLabelExpression, null)) { + throw new IOException( + "Invalid default label expression of " + " queue=" + getQueueName() + + " doesn't have permission to access all labels " + + "in default label expression. labelExpression of resource request=" + + (this.defaultLabelExpression == null ? + "" : + this.defaultLabelExpression) + ". Queue labels=" + ( + getAccessibleNodeLabels() == null ? + "" : + StringUtils + .join(getAccessibleNodeLabels().iterator(), ','))); + } + + nodeLocalityDelay = conf.getNodeLocalityDelay(); + rackLocalityFullReset = conf.getRackLocalityFullReset(); + + // re-init this since max allocation could have changed + this.minimumAllocationFactor = Resources.ratio(resourceCalculator, + Resources.subtract(maximumAllocation, minimumAllocation), + maximumAllocation); - defaultAppPriorityPerQueue = Priority.newInstance(conf - .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); - - LOG.info("Initializing " + queueName + "\n" + - "capacity = " + queueCapacities.getCapacity() + - " [= (float) configuredCapacity / 100 ]" + "\n" + - "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() + - " [= parentAbsoluteCapacity * capacity ]" + "\n" + - "maxCapacity = " + queueCapacities.getMaximumCapacity() + - " [= configuredMaxCapacity ]" + "\n" + - "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() + - " [= 1.0 maximumCapacity undefined, " + - "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + - "\n" + - "userLimit = " + userLimit + - " [= configuredUserLimit ]" + "\n" + - "userLimitFactor = " + userLimitFactor + - " [= configuredUserLimitFactor ]" + "\n" + - "maxApplications = " + maxApplications + - " [= configuredMaximumSystemApplicationsPerQueue or" + - " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + - "\n" + - "maxApplicationsPerUser = " + maxApplicationsPerUser + - " [= (int)(maxApplications * (userLimit / 100.0f) * " + - "userLimitFactor) ]" + "\n" + - "usedCapacity = " + queueCapacities.getUsedCapacity() + - " [= usedResourcesMemory / " + - "(clusterResourceMemory * absoluteCapacity)]" + "\n" + - "absoluteUsedCapacity = " + absoluteUsedCapacity + - " [= usedResourcesMemory / clusterResourceMemory]" + "\n" + - "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent + - " [= configuredMaximumAMResourcePercent ]" + "\n" + - "minimumAllocationFactor = " + minimumAllocationFactor + - " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + - "maximumAllocationMemory ]" + "\n" + - "maximumAllocation = " + maximumAllocation + - " [= configuredMaxAllocation ]" + "\n" + - "numContainers = " + numContainers + - " [= currentNumContainers ]" + "\n" + - "state = " + state + - " [= configuredState ]" + "\n" + - "acls = " + aclsString + - " [= configuredAcls ]" + "\n" + - "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + - "labels=" + labelStrBuilder.toString() + "\n" + - "reservationsContinueLooking = " + - reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n" + - "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue); + StringBuilder aclsString = new StringBuilder(); + for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) { + aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); + } + + StringBuilder labelStrBuilder = new StringBuilder(); + if (accessibleLabels != null) { + for (String s : accessibleLabels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } + + defaultAppPriorityPerQueue = Priority.newInstance( + conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + + LOG.info( + "Initializing " + queueName + "\n" + "capacity = " + queueCapacities + .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() + + " [= parentAbsoluteCapacity * capacity ]" + "\n" + + "maxCapacity = " + queueCapacities.getMaximumCapacity() + + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = " + + queueCapacities.getAbsoluteMaximumCapacity() + + " [= 1.0 maximumCapacity undefined, " + + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" + + "\n" + "userLimitFactor = " + userLimitFactor + + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + + maxApplications + + " [= configuredMaximumSystemApplicationsPerQueue or" + + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser + + " [= (int)(maxApplications * (userLimit / 100.0f) * " + + "userLimitFactor) ]" + "\n" + "usedCapacity = " + + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + + "absoluteUsedCapacity = " + absoluteUsedCapacity + + " [= usedResourcesMemory / clusterResourceMemory]" + "\n" + + "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent + + " [= configuredMaximumAMResourcePercent ]" + "\n" + + "minimumAllocationFactor = " + minimumAllocationFactor + + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + + "numContainers = " + numContainers + + " [= currentNumContainers ]" + "\n" + "state = " + state + + " [= configuredState ]" + "\n" + "acls = " + aclsString + + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder + .toString() + "\n" + "reservationsContinueLooking = " + + reservationsContinueLooking + "\n" + "preemptionDisabled = " + + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " + + defaultAppPriorityPerQueue); + } finally { + writeLock.unlock(); + } } @Override @@ -307,7 +308,7 @@ public class LeafQueue extends AbstractCSQueue { return maxApplications; } - public synchronized int getMaxApplicationsPerUser() { + public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } @@ -325,7 +326,8 @@ public class LeafQueue extends AbstractCSQueue { * Set user limit - used only for testing. * @param userLimit new user limit */ - synchronized void setUserLimit(int userLimit) { + @VisibleForTesting + void setUserLimit(int userLimit) { this.userLimit = userLimit; } @@ -333,50 +335,74 @@ public class LeafQueue extends AbstractCSQueue { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(float userLimitFactor) { + @VisibleForTesting + void setUserLimitFactor(float userLimitFactor) { this.userLimitFactor = userLimitFactor; } @Override - public synchronized int getNumApplications() { - return getNumPendingApplications() + getNumActiveApplications(); - } - - public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + public int getNumApplications() { + try { + readLock.lock(); + return getNumPendingApplications() + getNumActiveApplications(); + } finally { + readLock.unlock(); + } } - public synchronized int getNumActiveApplications() { - return orderingPolicy.getNumSchedulableEntities(); + public int getNumPendingApplications() { + try { + readLock.lock(); + return pendingOrderingPolicy.getNumSchedulableEntities(); + } finally { + readLock.unlock(); + } } - @Private - public synchronized int getNumApplications(String user) { - return getUser(user).getTotalApplications(); + public int getNumActiveApplications() { + try { + readLock.lock(); + return orderingPolicy.getNumSchedulableEntities(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getNumPendingApplications(String user) { - return getUser(user).getPendingApplications(); + public int getNumPendingApplications(String user) { + try { + readLock.lock(); + User u = getUser(user); + if (null == u) { + return 0; + } + return u.getPendingApplications(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getNumActiveApplications(String user) { - return getUser(user).getActiveApplications(); - } - - @Override - public synchronized QueueState getState() { - return state; + public int getNumActiveApplications(String user) { + try { + readLock.lock(); + User u = getUser(user); + if (null == u) { + return 0; + } + return u.getActiveApplications(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getUserLimit() { + public int getUserLimit() { return userLimit; } @Private - public synchronized float getUserLimitFactor() { + public float getUserLimitFactor() { return userLimitFactor; } @@ -388,112 +414,145 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized List<QueueUserACLInfo> + public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List<QueueACL> operations = new ArrayList<QueueACL>(); - for (QueueACL operation : QueueACL.values()) { - if (hasAccess(operation, user)) { - operations.add(operation); + try { + readLock.lock(); + QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( + QueueUserACLInfo.class); + List<QueueACL> operations = new ArrayList<>(); + for (QueueACL operation : QueueACL.values()) { + if (hasAccess(operation, user)) { + operations.add(operation); + } } + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return Collections.singletonList(userAclInfo); + } finally { + readLock.unlock(); } - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return Collections.singletonList(userAclInfo); } public String toString() { - return queueName + ": " + - "capacity=" + queueCapacities.getCapacity() + ", " + - "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + - "usedResources=" + queueUsage.getUsed() + ", " + - "usedCapacity=" + getUsedCapacity() + ", " + - "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + - "numApps=" + getNumApplications() + ", " + - "numContainers=" + getNumContainers(); + try { + readLock.lock(); + return queueName + ": " + "capacity=" + queueCapacities.getCapacity() + + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + + ", " + "usedResources=" + queueUsage.getUsed() + ", " + + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + + ", " + "numContainers=" + getNumContainers(); + } finally { + readLock.unlock(); + } + } - + @VisibleForTesting - public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) { - this.labelManager = mgr; + public User getUser(String userName) { + return users.get(userName); } - @VisibleForTesting - public synchronized User getUser(String userName) { - User user = users.get(userName); - if (user == null) { - user = new User(); - users.put(userName, user); + // Get and add user if absent + private User getUserAndAddIfAbsent(String userName) { + try { + writeLock.lock(); + User u = users.get(userName); + if (null == u) { + u = new User(); + users.put(userName, u); + } + return u; + } finally { + writeLock.unlock(); } - return user; } /** * @return an ArrayList of UserInfo objects who are active in this queue */ - public synchronized ArrayList<UserInfo> getUsers() { - ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>(); - for (Map.Entry<String, User> entry : users.entrySet()) { - User user = entry.getValue(); - usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user - .getAllUsed()), user.getActiveApplications(), user - .getPendingApplications(), Resources.clone(user - .getConsumedAMResources()), Resources.clone(user - .getUserResourceLimit()), user.getResourceUsage())); + public ArrayList<UserInfo> getUsers() { + try { + readLock.lock(); + ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>(); + for (Map.Entry<String, User> entry : users.entrySet()) { + User user = entry.getValue(); + usersToReturn.add( + new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), + user.getActiveApplications(), user.getPendingApplications(), + Resources.clone(user.getConsumedAMResources()), + Resources.clone(user.getUserResourceLimit()), + user.getResourceUsage())); + } + return usersToReturn; + } finally { + readLock.unlock(); } - return usersToReturn; } @Override - public synchronized void reinitialize( + public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof LeafQueue) || - !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + newlyParsedQueue.getQueuePath()); - } - - LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } - // don't allow the maximum allocation to be decreased in size - // since we have already told running AM's the size - Resource oldMax = getMaximumAllocation(); - Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); - if (newMax.getMemorySize() < oldMax.getMemorySize() - || newMax.getVirtualCores() < oldMax.getVirtualCores()) { - throw new IOException( - "Trying to reinitialize " - + getQueuePath() - + " the maximum allocation size can not be decreased!" - + " Current setting: " + oldMax - + ", trying to set it to: " + newMax); - } + LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue; + + // don't allow the maximum allocation to be decreased in size + // since we have already told running AM's the size + Resource oldMax = getMaximumAllocation(); + Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); + if (newMax.getMemorySize() < oldMax.getMemorySize() + || newMax.getVirtualCores() < oldMax.getVirtualCores()) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " the maximum allocation size can not be decreased!" + + " Current setting: " + oldMax + ", trying to set it to: " + + newMax); + } - setupQueueConfigs(clusterResource); + setupQueueConfigs(clusterResource); - // queue metrics are updated, more resource may be available - // activate the pending applications if possible - activateApplications(); + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); + } finally { + writeLock.unlock(); + } } @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! - synchronized (this) { - User user = getUser(userName); + try { + writeLock.lock(); + + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); + // Add the attempt to our data-structures addApplicationAttempt(application, user); + } finally { + writeLock.unlock(); } // We don't want to update metrics for move app if (application.isPending()) { metrics.submitAppAttempt(userName); } + getParent().submitApplicationAttempt(application, userName); } @@ -501,37 +560,38 @@ public class LeafQueue extends AbstractCSQueue { public void submitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! - - User user = null; - synchronized (this) { - + try { + writeLock.lock(); // Check if the queue is accepting jobs if (getState() != QueueState.RUNNING) { - String msg = "Queue " + getQueuePath() + - " is STOPPED. Cannot accept submission of application: " + applicationId; + String msg = "Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId; LOG.info(msg); throw new AccessControlException(msg); } // Check submission limits for queues if (getNumApplications() >= getMaxApplications()) { - String msg = "Queue " + getQueuePath() + - " already has " + getNumApplications() + " applications," + - " cannot accept submission of application: " + applicationId; + String msg = + "Queue " + getQueuePath() + " already has " + getNumApplications() + + " applications," + + " cannot accept submission of application: " + applicationId; LOG.info(msg); throw new AccessControlException(msg); } // Check submission limits for the user on this queue - user = getUser(userName); + User user = getUserAndAddIfAbsent(userName); if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { - String msg = "Queue " + getQueuePath() + - " already has " + user.getTotalApplications() + - " applications from user " + userName + - " cannot accept submission of application: " + applicationId; + String msg = "Queue " + getQueuePath() + " already has " + user + .getTotalApplications() + " applications from user " + userName + + " cannot accept submission of application: " + applicationId; LOG.info(msg); throw new AccessControlException(msg); } + } finally { + writeLock.unlock(); } // Inform the parent queue @@ -553,214 +613,237 @@ public class LeafQueue extends AbstractCSQueue { return queueUsage.getAMLimit(nodePartition); } - public synchronized Resource calculateAndGetAMResourceLimit() { + @VisibleForTesting + public Resource calculateAndGetAMResourceLimit() { return calculateAndGetAMResourceLimitPerPartition( RMNodeLabelsManager.NO_LABEL); } @VisibleForTesting - public synchronized Resource getUserAMResourceLimit() { + public Resource getUserAMResourceLimit() { return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); } - public synchronized Resource getUserAMResourceLimitPerPartition( + public Resource getUserAMResourceLimitPerPartition( String nodePartition) { - /* - * The user am resource limit is based on the same approach as the user - * limit (as it should represent a subset of that). This means that it uses - * the absolute queue capacity (per partition) instead of the max and is - * modified by the userlimit and the userlimit factor as is the userlimit - */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, - 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); - - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); - - Resource userAMLimit = Resources.multiplyAndNormalizeUp(resourceCalculator, - queuePartitionResource, - queueCapacities.getMaxAMResourcePercentage(nodePartition) - * effectiveUserLimit * userLimitFactor, minimumAllocation); - return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAMLimit, getAMResourceLimitPerPartition(nodePartition)) - ? userAMLimit - : getAMResourceLimitPerPartition(nodePartition); - } - - public synchronized Resource calculateAndGetAMResourceLimitPerPartition( + try { + readLock.lock(); + /* + * The user am resource limit is based on the same approach as the user + * limit (as it should represent a subset of that). This means that it uses + * the absolute queue capacity (per partition) instead of the max and is + * modified by the userlimit and the userlimit factor as is the userlimit + */ + float effectiveUserLimit = Math.max(userLimit / 100.0f, + 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); + + Resource userAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition) + * effectiveUserLimit * userLimitFactor, minimumAllocation); + return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? + userAMLimit : + getAMResourceLimitPerPartition(nodePartition); + } finally { + readLock.unlock(); + } + + } + + public Resource calculateAndGetAMResourceLimitPerPartition( String nodePartition) { - /* - * For non-labeled partition, get the max value from resources currently - * available to the queue and the absolute resources guaranteed for the - * partition in the queue. For labeled partition, consider only the absolute - * resources guaranteed. Multiply this value (based on labeled/ - * non-labeled), * with per-partition am-resource-percent to get the max am - * resource limit for this queue and partition. - */ - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); - - Resource queueCurrentLimit = Resources.none(); - // For non-labeled partition, we need to consider the current queue - // usage limit. - if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - synchronized (queueResourceLimitsInfo) { - queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + try { + writeLock.lock(); + /* + * For non-labeled partition, get the max value from resources currently + * available to the queue and the absolute resources guaranteed for the + * partition in the queue. For labeled partition, consider only the absolute + * resources guaranteed. Multiply this value (based on labeled/ + * non-labeled), * with per-partition am-resource-percent to get the max am + * resource limit for this queue and partition. + */ + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); + + Resource queueCurrentLimit = Resources.none(); + // For non-labeled partition, we need to consider the current queue + // usage limit. + if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + synchronized (queueResourceLimitsInfo){ + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); + } } - } - float amResourcePercent = queueCapacities - .getMaxAMResourcePercentage(nodePartition); + float amResourcePercent = queueCapacities.getMaxAMResourcePercentage( + nodePartition); - // Current usable resource for this queue and partition is the max of - // queueCurrentLimit and queuePartitionResource. - Resource queuePartitionUsableResource = Resources.max(resourceCalculator, - lastClusterResource, queueCurrentLimit, queuePartitionResource); + // Current usable resource for this queue and partition is the max of + // queueCurrentLimit and queuePartitionResource. + Resource queuePartitionUsableResource = Resources.max(resourceCalculator, + lastClusterResource, queueCurrentLimit, queuePartitionResource); - Resource amResouceLimit = Resources.multiplyAndNormalizeUp( - resourceCalculator, queuePartitionUsableResource, amResourcePercent, - minimumAllocation); + Resource amResouceLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionUsableResource, amResourcePercent, + minimumAllocation); - metrics.setAMResouceLimit(amResouceLimit); - queueUsage.setAMLimit(nodePartition, amResouceLimit); - return amResouceLimit; + metrics.setAMResouceLimit(amResouceLimit); + queueUsage.setAMLimit(nodePartition, amResouceLimit); + return amResouceLimit; + } finally { + writeLock.unlock(); + } } - private synchronized void activateApplications() { - // limit of allowed resource usage for application masters - Map<String, Resource> userAmPartitionLimit = - new HashMap<String, Resource>(); - - // AM Resource Limit for accessible labels can be pre-calculated. - // This will help in updating AMResourceLimit for all labels when queue - // is initialized for the first time (when no applications are present). - for (String nodePartition : getNodeLabelsForQueue()) { - calculateAndGetAMResourceLimitPerPartition(nodePartition); - } + private void activateApplications() { + try { + writeLock.lock(); + // limit of allowed resource usage for application masters + Map<String, Resource> userAmPartitionLimit = + new HashMap<String, Resource>(); + + // AM Resource Limit for accessible labels can be pre-calculated. + // This will help in updating AMResourceLimit for all labels when queue + // is initialized for the first time (when no applications are present). + for (String nodePartition : getNodeLabelsForQueue()) { + calculateAndGetAMResourceLimitPerPartition(nodePartition); + } - for (Iterator<FiCaSchedulerApp> fsApp = - getPendingAppsOrderingPolicy().getAssignmentIterator(); - fsApp.hasNext();) { - FiCaSchedulerApp application = fsApp.next(); - ApplicationId applicationId = application.getApplicationId(); + for (Iterator<FiCaSchedulerApp> fsApp = + getPendingAppsOrderingPolicy().getAssignmentIterator(); + fsApp.hasNext(); ) { + FiCaSchedulerApp application = fsApp.next(); + ApplicationId applicationId = application.getApplicationId(); - // Get the am-node-partition associated with each application - // and calculate max-am resource limit for this partition. - String partitionName = application.getAppAMNodePartitionName(); + // Get the am-node-partition associated with each application + // and calculate max-am resource limit for this partition. + String partitionName = application.getAppAMNodePartitionName(); - Resource amLimit = getAMResourceLimitPerPartition(partitionName); - // Verify whether we already calculated am-limit for this label. - if (amLimit == null) { - amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); - } - // Check am resource limit. - Resource amIfStarted = Resources.add( - application.getAMResource(partitionName), - queueUsage.getAMUsed(partitionName)); + Resource amLimit = getAMResourceLimitPerPartition(partitionName); + // Verify whether we already calculated am-limit for this label. + if (amLimit == null) { + amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); + } + // Check am resource limit. + Resource amIfStarted = Resources.add( + application.getAMResource(partitionName), + queueUsage.getAMUsed(partitionName)); - if (LOG.isDebugEnabled()) { - LOG.debug("application "+application.getId() +" AMResource " - + application.getAMResource(partitionName) - + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent - + " amLimit " + amLimit + " lastClusterResource " - + lastClusterResource + " amIfStarted " + amIfStarted - + " AM node-partition name " + partitionName); - } + if (LOG.isDebugEnabled()) { + LOG.debug("application " + application.getId() + " AMResource " + + application.getAMResource(partitionName) + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + " lastClusterResource " + + lastClusterResource + " amIfStarted " + amIfStarted + + " AM node-partition name " + partitionName); + } - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - amIfStarted, amLimit)) { - if (getNumActiveApplications() < 1 - || (Resources.lessThanOrEqual(resourceCalculator, - lastClusterResource, queueUsage.getAMUsed(partitionName), - Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue, it is likely set too low." - + " skipping enforcement to allow at least one application" - + " to start"); - } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); - LOG.info("Not activating application " + applicationId - + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " - + amLimit); - continue; + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application" + + " to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); + LOG.info("Not activating application " + applicationId + + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " + + amLimit); + continue; + } } - } - // Check user am resource limit - User user = getUser(application.getUser()); - Resource userAMLimit = userAmPartitionLimit.get(partitionName); + // Check user am resource limit + User user = getUser(application.getUser()); + Resource userAMLimit = userAmPartitionLimit.get(partitionName); - // Verify whether we already calculated user-am-limit for this label. - if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName); - userAmPartitionLimit.put(partitionName, userAMLimit); - } + // Verify whether we already calculated user-am-limit for this label. + if (userAMLimit == null) { + userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAmPartitionLimit.put(partitionName, userAMLimit); + } - Resource userAmIfStarted = Resources.add( - application.getAMResource(partitionName), - user.getConsumedAMResources(partitionName)); - - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAmIfStarted, userAMLimit)) { - if (getNumActiveApplications() < 1 - || (Resources.lessThanOrEqual(resourceCalculator, - lastClusterResource, queueUsage.getAMUsed(partitionName), - Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue for user, it is likely set too" - + " low. skipping enforcement to allow at least one application" - + " to start"); - } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); - LOG.info("Not activating application " + applicationId - + " for user: " + user + " as userAmIfStarted: " - + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit); - continue; + Resource userAmIfStarted = Resources.add( + application.getAMResource(partitionName), + user.getConsumedAMResources(partitionName)); + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAmIfStarted, userAMLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too" + + " low. skipping enforcement to allow at least one application" + + " to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); + LOG.info( + "Not activating application " + applicationId + " for user: " + + user + " as userAmIfStarted: " + userAmIfStarted + + " exceeds userAmLimit: " + userAMLimit); + continue; + } } + user.activateApplication(); + orderingPolicy.addSchedulableEntity(application); + application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); + + queueUsage.incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().setAMLimit(partitionName, userAMLimit); + metrics.incAMUsed(application.getUser(), + application.getAMResource(partitionName)); + metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); + fsApp.remove(); + LOG.info("Application " + applicationId + " from user: " + application + .getUser() + " activated in queue: " + getQueueName()); } - user.activateApplication(); - orderingPolicy.addSchedulableEntity(application); - application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); - - queueUsage.incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().setAMLimit(partitionName, userAMLimit); - metrics.incAMUsed(application.getUser(), - application.getAMResource(partitionName)); - metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - fsApp.remove(); - LOG.info("Application " + applicationId + " from user: " - + application.getUser() + " activated in queue: " + getQueueName()); + } finally { + writeLock.unlock(); } } - private synchronized void addApplicationAttempt(FiCaSchedulerApp application, + private void addApplicationAttempt(FiCaSchedulerApp application, User user) { - // Accept - user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); - applicationAttemptMap.put(application.getApplicationAttemptId(), application); + try { + writeLock.lock(); + // Accept + user.submitApplication(); + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + applicationAttemptMap.put(application.getApplicationAttemptId(), + application); - // Activate applications - activateApplications(); - - LOG.info("Application added -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + "," + - " leaf-queue: " + getQueueName() + - " #user-pending-applications: " + user.getPendingApplications() + - " #user-active-applications: " + user.getActiveApplications() + - " #queue-pending-applications: " + getNumPendingApplications() + - " #queue-active-applications: " + getNumActiveApplications() - ); + // Activate applications + activateApplications(); + + LOG.info( + "Application added -" + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + "," + " leaf-queue: " + + getQueueName() + " #user-pending-applications: " + user + .getPendingApplications() + " #user-active-applications: " + user + .getActiveApplications() + " #queue-pending-applications: " + + getNumPendingApplications() + " #queue-active-applications: " + + getNumActiveApplications()); + } finally { + writeLock.unlock(); + } } @Override @@ -774,49 +857,54 @@ public class LeafQueue extends AbstractCSQueue { @Override public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) { // Careful! Locking order is important! - synchronized (this) { - removeApplicationAttempt(application, getUser(application.getUser())); - } + removeApplicationAttempt(application, application.getUser()); getParent().finishApplicationAttempt(application, queue); } - public synchronized void removeApplicationAttempt( - FiCaSchedulerApp application, User user) { - String partitionName = application.getAppAMNodePartitionName(); - boolean wasActive = - orderingPolicy.removeSchedulableEntity(application); - if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); - } else { - queueUsage.decAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().decAMUsed(partitionName, - application.getAMResource(partitionName)); - metrics.decAMUsed(application.getUser(), - application.getAMResource(partitionName)); - } - applicationAttemptMap.remove(application.getApplicationAttemptId()); + private void removeApplicationAttempt( + FiCaSchedulerApp application, String userName) { + try { + writeLock.lock(); - user.finishApplication(wasActive); - if (user.getTotalApplications() == 0) { - users.remove(application.getUser()); - } + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); + + String partitionName = application.getAppAMNodePartitionName(); + boolean wasActive = orderingPolicy.removeSchedulableEntity(application); + if (!wasActive) { + pendingOrderingPolicy.removeSchedulableEntity(application); + } else{ + queueUsage.decAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().decAMUsed(partitionName, + application.getAMResource(partitionName)); + metrics.decAMUsed(application.getUser(), + application.getAMResource(partitionName)); + } + applicationAttemptMap.remove(application.getApplicationAttemptId()); + + user.finishApplication(wasActive); + if (user.getTotalApplications() == 0) { + users.remove(application.getUser()); + } - // Check if we can activate more applications - activateApplications(); + // Check if we can activate more applications + activateApplications(); - LOG.info("Application removed -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + - " queue: " + getQueueName() + - " #user-pending-applications: " + user.getPendingApplications() + - " #user-active-applications: " + user.getActiveApplications() + - " #queue-pending-applications: " + getNumPendingApplications() + - " #queue-active-applications: " + getNumActiveApplications() - ); + LOG.info( + "Application removed -" + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + " queue: " + getQueueName() + + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications()); + } finally { + writeLock.unlock(); + } } - private synchronized FiCaSchedulerApp getApplication( + private FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applicationAttemptMap.get(applicationAttemptId); } @@ -878,170 +966,171 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + try { + writeLock.lock(); + updateCurrentResourceLimits(currentResourceLimits, clusterResource); - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "assignContainers: node=" + node.getNodeName() + " #applications=" + + orderingPolicy.getNumSchedulableEntities()); + } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + setPreemptionAllowed(currentResourceLimits, node.getPartition()); - // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = - getApplication(reservedContainer.getApplicationAttemptId()); + // Check for reserved resources + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = getApplication( + reservedContainer.getApplicationAttemptId()); - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); - synchronized (application) { - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); + CSAssignment assignment = application.assignContainers(clusterResource, + node, currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment, node, application); killToPreemptContainers(clusterResource, node, assignment); return assignment; } - } - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node - .getPartition()); - return CSAssignment.NULL_ASSIGNMENT; - } - - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + return CSAssignment.NULL_ASSIGNMENT; } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); - return CSAssignment.NULL_ASSIGNMENT; - } - - for (Iterator<FiCaSchedulerApp> assignmentIterator = - orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { - FiCaSchedulerApp application = assignmentIterator.next(); - - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); - // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, application.getCurrentReservation(), + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!hasPendingResourceRequest(node.getPartition(), clusterResource, schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, - application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node + .getPartition()); + } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); return CSAssignment.NULL_ASSIGNMENT; } - Resource userLimit = - computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { - application.updateAMContainerDiagnostics(AMState.ACTIVATED, - "User capacity has reached its maximum limit."); - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, - application, application.getPriority(), - ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); - continue; - } + for (Iterator<FiCaSchedulerApp> assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext(); ) { + FiCaSchedulerApp application = assignmentIterator.next(); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + + // Check queue max-capacity limit + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, application.getCurrentReservation(), + schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } - // Try to schedule - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, node.getPartition(), schedulingMode); + + // Check user limit + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, + application, node.getPartition(), currentResourceLimits)) { + application.updateAMContainerDiagnostics(AMState.ACTIVATED, + "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); + continue; + } - if (LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " - + application.getApplicationId()); - application.showRequests(); - } + // Try to schedule + CSAssignment assignment = application.assignContainers(clusterResource, + node, currentResourceLimits, schedulingMode, null); - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); + if (LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + application + .getApplicationId()); + application.showRequests(); + } - if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, - Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, - assignment.isIncreasedAllocation()); - - // Update reserved metrics - Resource reservedRes = assignment.getAssignmentInformation() - .getReserved(); - if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); - } + handleExcessReservedContainer(clusterResource, assignment, node, + application); + killToPreemptContainers(clusterResource, node, assignment); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); + + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getPartition(), reservedOrAllocatedRMContainer, + assignment.isIncreasedAllocation()); + + // Update reserved metrics + Resource reservedRes = + assignment.getAssignmentInformation().getReserved(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + incReservedResource(node.getPartition(), reservedRes); + } - // Done - return assignment; - } else if (assignment.getSkippedType() - == CSAssignment.SkippedType.OTHER) { - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - application.updateNodeInfoForAMDiagnostics(node); - } else if(assignment.getSkippedType() - == CSAssignment.SkippedType.QUEUE_LIMIT) { - return assignment; - } else { - // If we don't allocate anything, and it is not skipped by application, - // we will return to respect FIFO of applications - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.RESPECT_FIFO); - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + + // Done + return assignment; + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + application.updateNodeInfoForAMDiagnostics(node); + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; + } else{ + // If we don't allocate anything, and it is not skipped by application, + // we will return to respect FIFO of applications + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } } - } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + return CSAssignment.NULL_ASSIGNMENT; + } finally { + writeLock.unlock(); + } } protected Resource getHeadroom(User user, Resource queueCurrentLimit, @@ -1116,7 +1205,8 @@ public class LeafQueue extends AbstractCSQueue { } } - @Lock({LeafQueue.class, FiCaSchedulerApp.class}) + // It doesn't necessarily to hold application's lock here. + @Lock({LeafQueue.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { @@ -1288,51 +1378,53 @@ public class LeafQueue extends AbstractCSQueue { } @Private - protected synchronized boolean canAssignToUser(Resource clusterResource, + protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { - User user = getUser(userName); - - currentResourceLimits.setAmountNeededUnreserve(Resources.none()); - - // Note: We aren't considering the current request since there is a fixed - // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources - .greaterThan(resourceCalculator, clusterResource, - user.getUsed(nodePartition), - limit)) { - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && - nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { - if (Resources.lessThanOrEqual( - resourceCalculator, - clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { - - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit based on reservations - " + " consumed: " - + user.getUsed() + " reserved: " - + application.getCurrentReservation() + " limit: " + limit); + try { + readLock.lock(); + User user = getUser(userName); + + currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + + // Note: We aren't considering the current request since there is a fixed + // overhead of the AM, but it's a > check, not a >= check, so... + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getUsed(nodePartition), limit)) { + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && nodePartition.equals( + CommonNodeLabelsManager.NO_LABEL)) { + if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, + Resources.subtract(user.getUsed(), + application.getCurrentReservation()), limit)) { + + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + + " consumed: " + user.getUsed() + " reserved: " + application + .getCurrentReservation() + " limit: " + limit); + } + Resource amountNeededToUnreserve = Resources.subtract( + user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first to + // respect user-limit + currentResourceLimits.setAmountNeededUnreserve( + amountNeededToUnreserve); + return true; } - Resource amountNeededToUnreserve = - Resources.subtract(user.getUsed(nodePartition), limit); - // we can only acquire a new container if we unreserve first to - // respect user-limit - currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve); - return true; } + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + user + .getUsed(nodePartition) + " limit: " + limit); + } + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit - " + " consumed: " - + user.getUsed(nodePartition) + " limit: " + limit); - } - return false; + return true; + } finally { + readLock.unlock(); } - return true; } @Override @@ -1340,15 +1432,15 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { boolean removed = false; Priority priority = null; - - synchronized (this) { + + try { + writeLock.lock(); if (rmContainer.getContainer() != null) { priority = rmContainer.getContainer().getPriority(); } if (null != priority) { - removed = app.unreserve( - rmContainer.getAllocatedSchedulerKey(), node, + removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node, rmContainer); } @@ -1359,8 +1451,10 @@ public class LeafQueue extends AbstractCSQueue { releaseResource(clusterResource, app, rmContainer.getReservedResource(), node.getPartition(), rmContainer, true); } + } finally { + writeLock.unlock(); } - + if (removed) { getParent().unreserveIncreasedContainer(clusterResource, app, node, rmContainer); @@ -1387,42 +1481,52 @@ public class LeafQueue extends AbstractCSQueue { } } - private synchronized float calculateUserUsageRatio(Resource clusterResource, + private float calculateUserUsageRatio(Resource clusterResource, String nodePartition) { - Resource resourceByLabel = - labelManager.getResourceByLabel(nodePartition, clusterResource); - float consumed = 0; - User user; - for (Map.Entry<String, User> entry : users.entrySet()) { - user = entry.getValue(); - consumed += user.resetAndUpdateUsageRatio(resourceCalculator, - resourceByLabel, nodePartition); + try { + writeLock.lock(); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + float consumed = 0; + User user; + for (Map.Entry<String, User> entry : users.entrySet()) { + user = entry.getValue(); + consumed += user.resetAndUpdateUsageRatio(resourceCalculator, + resourceByLabel, nodePartition); + } + return consumed; + } finally { + writeLock.unlock(); } - return consumed; } - private synchronized void recalculateQueueUsageRatio(Resource clusterResource, + private void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) { - ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); - - if (nodePartition == null) { - for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), - queueResourceUsage.getNodePartitionsSet())) { - qUsageRatios.setUsageRatio(partition, - calculateUserUsageRatio(clusterResource, partition)); + try { + writeLock.lock(); + ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union( + queueCapacities.getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + qUsageRatios.setUsageRatio(partition, + calculateUserUsageRatio(clusterResource, partition)); + } + } else{ + qUsageRatios.setUsageRatio(nodePartition, + calculateUserUsageRatio(clusterResource, nodePartition)); } - } else { - qUsageRatios.setUsageRatio(nodePartition, - calculateUserUsageRatio(clusterResource, nodePartition)); + } finally { + writeLock.unlock(); } } - private synchronized void updateQueueUsageRatio(String nodePartition, + private void updateQueueUsageRatio(String nodePartition, float delta) { qUsageRatios.incUsageRatio(nodePartition, delta); } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1445,21 +1549,20 @@ public class LeafQueue extends AbstractCSQueue { boolean removed = false; // Careful! Locking order is important! - synchronized (this) { - + try { + writeLock.lock(); Container container = rmContainer.getContainer(); // Inform the application & the node // Note: It's safe to assume that all state changes to RMContainer - // happen under scheduler's lock... + // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { removed = application.unreserve(rmContainer.getReservedSchedulerKey(), node, rmContainer); - } else { - removed = - application.containerCompleted(rmContainer, containerStatus, - event, node.getPartition()); + } else{ + removed = application.containerCompleted(rmContainer, containerStatus, + event, node.getPartition()); node.releaseContainer(container); } @@ -1469,12 +1572,15 @@ public class LeafQueue extends AbstractCSQueue { // Inform the ordering policy orderingPolicy.containerReleased(application, rmContainer); - + releaseResource(clusterResource, application, container.getResource(), node.getPartition(), rmContainer, false); } + } finally { + writeLock.unlock(); } + if (removed) { // Inform the parent queue _outside_ of the leaf-queue lock getParent().completedContainer(clusterResource, application, node, @@ -1487,91 +1593,104 @@ public class LeafQueue extends AbstractCSQueue { new KillableContainer(rmContainer, node.getPartition(), queueName)); } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, String nodePartition, RMContainer rmContainer, boolean isIncreasedAllocation) { - super.allocateResource(clusterResource, resource, nodePartition, - isIncreasedAllocation); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // handle ignore exclusivity container - if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL) - && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - TreeSet<RMContainer> rmContainers = null; - if (null == (rmContainers = - ignorePartitionExclusivityRMContainers.get(nodePartition))) { - rmContainers = new TreeSet<>(); - ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers); + try { + writeLock.lock(); + super.allocateResource(clusterResource, resource, nodePartition, + isIncreasedAllocation); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( + RMNodeLabelsManager.NO_LABEL)) { + TreeSet<RMContainer> rmContainers = null; + if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get( + nodePartition))) { + rmContainers = new TreeSet<>(); + ignorePartitionExclusivityRMContainers.put(nodePartition, + rmContainers); + } + rmContainers.add(rmContainer); } - rmContainers.add(rmContainer); - } - // Update user metrics - String userName = application.getUser(); - User user = getUser(userName); - user.assignContainer(resource, nodePartition); + // Update user metrics + String userName = application.getUser(); - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); - // Note this is a bit unconventional since it gets the object and modifies - // it here, rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + - " user=" + userName + - " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + - " headroom = " + application.getHeadroom() + - " user-resources=" + user.getUsed() - ); + user.assignContainer(resource, nodePartition); + + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); + + // Note this is a bit unconventional since it gets the object and modifies + // it here, rather then using set routine + Resources.subtractFrom(application.getHeadroom(), resource); // headroom + metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage + .getUsed() + " numContainers=" + numContainers + " headroom = " + + application.getHeadroom() + " user-resources=" + user.getUsed()); + } + } finally { + writeLock.unlock(); } } - synchronized void releaseResource(Resource clusterResource, + void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer, boolean isChangeResource) { - super.releaseResource(clusterResource, resource, nodePartition, - isChangeResource); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // handle ignore exclusivity container - if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL) - && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { - Set<RMContainer> rmContainers = - ignorePartitionExclusivityRMContainers.get(nodePartition); - rmContainers.remove(rmContainer); - if (rmContainers.isEmpty()) { - ignorePartitionExclusivityRMContainers.remove(nodePartition); + try { + writeLock.lock(); + super.releaseResource(clusterResource, resource, nodePartition, + isChangeResource); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( + RMNodeLabelsManager.NO_LABEL)) { + if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { + Set<RMContainer> rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition); + rmContainers.remove(rmContainer); + if (rmContainers.isEmpty()) { + ignorePartitionExclusivityRMContainers.remove(nodePartition); + } } } - } - // Update user metrics - String userName = application.getUser(); - User user = getUser(userName); - user.releaseContainer(resource, nodePartition); + // Update user metrics + String userName = application.getUser(); + User user = getUserAndAddIfAbsent(userName); + user.releaseContainer(resource, nodePartition); - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + - " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getUsed()); + if (LOG.isDebugEnabled()) { + LOG.debug( + getQueueName() + " used=" + queueUsage.getUsed() + " numContainers=" + + numContainers + " user=" + userName + " user-resources=" + + user.getUsed()); + } + } finally { + writeLock.unlock(); } } @@ -1596,35 +1715,38 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource, + public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); - lastClusterResource = clusterResource; - - // Update headroom info based on new cluster resource value - // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity - // during allocation - setQueueResourceLimitsInfo(clusterResource); + try { + writeLock.lock(); + updateCurrentResourceLimits(currentResourceLimits, clusterResource); + lastClusterResource = clusterResource; - // Update user consumedRatios - recalculateQueueUsageRatio(clusterResource, null); + // Update headroom info based on new cluster resource value + // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity + // during allocation + setQueueResourceLimitsInfo(clusterResource); - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); + // Update user consumedRatios + recalculateQueueUsageRatio(clusterResource, null); - // queue metrics are updated, more resource may be available - // activate the pending applications if possible - activateApplications(); + // Update metrics + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); - // Update application properties - for (FiCaSchedulerApp application : - orderingPolicy.getSchedulableEntities()) { - synchronized (application) { + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); + + // Update application properties + for (FiCaSchedulerApp application : orderingPolicy + .getSchedulableEntities()) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } + } finally { + writeLock.unlock(); } } @@ -1721,30 +1843,47 @@ public class LeafQueue extends AbstractCSQueue { public static class User { ResourceUsage userResourceUsage = new ResourceUsage(); volatile Resource userResourceLimit = Resource.newInstance(0, 0); - int pendingApplications = 0; - int activeApplications = 0; + volatile int pendingApplications = 0; + volatile int activeApplications = 0; private UsageRatios userUsageRatios = new UsageRatios(); + private WriteLock writeLock; + + User() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Nobody uses read-lock now, will add it when necessary + writeLock = lock.writeLock(); + } public ResourceUsage getResourceUsage() { return userResourceUsage; } - public synchronized float resetAndUpdateUsageRatio( + public float resetAndUpdateUsageRatio( ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { - userUsageRatios.setUsageRatio(nodePartition, 0); - return updateUsageRatio(resourceCalculator, resource, nodePartition); + try { + writeLock.lock(); + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } finally { + writeLock.unlock(); + } } - public synchronized float updateUsageRatio( + public float updateUsageRatio( ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { - float delta; - float newRatio = - Resources.ratio(resourceCalculator, getUsed(nodePartition), resource); - delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); - userUsageRatios.setUsageRatio(nodePartition, newRatio); - return delta; + try { + writeLock.lock(); + float delta; + float newRatio = Resources.ratio(resourceCalculator, + getUsed(no
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org