YARN-3140. Improve locks in AbstractCSQueue/LeafQueue/ParentQueue. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b66d9ec Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b66d9ec Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b66d9ec Branch: refs/heads/trunk Commit: 2b66d9ec5bdaec7e6b278926fbb6f222c4e3afaa Parents: e52d6e7 Author: Jian He <jia...@apache.org> Authored: Tue Sep 20 15:03:07 2016 +0800 Committer: Jian He <jia...@apache.org> Committed: Tue Sep 20 15:03:31 2016 +0800 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 10 + .../scheduler/capacity/AbstractCSQueue.java | 378 ++-- .../scheduler/capacity/LeafQueue.java | 1819 ++++++++++-------- .../scheduler/capacity/ParentQueue.java | 825 ++++---- .../scheduler/capacity/PlanQueue.java | 122 +- .../scheduler/capacity/ReservationQueue.java | 67 +- .../capacity/TestContainerResizing.java | 4 +- 7 files changed, 1787 insertions(+), 1438 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index a5c0f71..01b1da7 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -564,4 +564,14 @@ ââââ</Or> ââââ<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" /> ââ</Match> + + <!-- Ignore VO_VOLATILE_INCREMENT, they will be protected by writeLock --> + <Match> + <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User" /> +ââââ<Or> + <Field name="pendingApplications" /> + <Field name="activeApplications" /> +ââââ</Or> + <Bug pattern="VO_VOLATILE_INCREMENT" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b66d9ec/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 1d8f929..096f5ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -60,25 +61,25 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); - CSQueue parent; + volatile CSQueue parent; final String queueName; volatile int numContainers; final Resource minimumAllocation; volatile Resource maximumAllocation; - QueueState state; + volatile QueueState state; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; final ResourceCalculator resourceCalculator; Set<String> accessibleLabels; - RMNodeLabelsManager labelManager; + final RMNodeLabelsManager labelManager; String defaultLabelExpression; Map<AccessType, AccessControlList> acls = new HashMap<AccessType, AccessControlList>(); volatile boolean reservationsContinueLooking; - private boolean preemptionDisabled; + private volatile boolean preemptionDisabled; // Track resource usage-by-label like used-resource/pending-resource, etc. volatile ResourceUsage queueUsage; @@ -94,6 +95,9 @@ public abstract class AbstractCSQueue implements CSQueue { protected ActivitiesManager activitiesManager; + protected ReentrantReadWriteLock.ReadLock readLock; + protected ReentrantReadWriteLock.WriteLock writeLock; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -116,7 +120,11 @@ public abstract class AbstractCSQueue implements CSQueue { queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); // initialize QueueCapacities - queueCapacities = new QueueCapacities(parent == null); + queueCapacities = new QueueCapacities(parent == null); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); } protected void setupConfigurableCapacities() { @@ -128,12 +136,12 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized float getCapacity() { + public float getCapacity() { return queueCapacities.getCapacity(); } @Override - public synchronized float getAbsoluteCapacity() { + public float getAbsoluteCapacity() { return queueCapacities.getAbsoluteCapacity(); } @@ -167,7 +175,7 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized QueueState getState() { + public QueueState getState() { return state; } @@ -187,13 +195,13 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized CSQueue getParent() { + public CSQueue getParent() { return parent; } @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; + public void setParent(CSQueue newParentQueue) { + this.parent = newParentQueue; } public Set<String> getAccessibleNodeLabels() { @@ -221,18 +229,22 @@ public abstract class AbstractCSQueue implements CSQueue { * Set maximum capacity - used only for testing. * @param maximumCapacity new max capacity */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), - queueCapacities.getCapacity(), maximumCapacity); - float absMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), - queueCapacities.getAbsoluteCapacity(), - absMaxCapacity); - - queueCapacities.setMaximumCapacity(maximumCapacity); - queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); + void setMaxCapacity(float maximumCapacity) { + try { + writeLock.lock(); + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), + queueCapacities.getCapacity(), maximumCapacity); + float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, parent); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), + queueCapacities.getAbsoluteCapacity(), absMaxCapacity); + + queueCapacities.setMaximumCapacity(maximumCapacity); + queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); + } finally { + writeLock.unlock(); + } } @Override @@ -240,70 +252,82 @@ public abstract class AbstractCSQueue implements CSQueue { return defaultLabelExpression; } - synchronized void setupQueueConfigs(Resource clusterResource) + void setupQueueConfigs(Resource clusterResource) throws IOException { - // get labels - this.accessibleLabels = - csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); - this.defaultLabelExpression = csContext.getConfiguration() - .getDefaultNodeLabelExpression(getQueuePath()); - - // inherit from parent if labels not set - if (this.accessibleLabels == null && parent != null) { - this.accessibleLabels = parent.getAccessibleNodeLabels(); - } - - // inherit from parent if labels not set - if (this.defaultLabelExpression == null && parent != null - && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { - this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); - } + try { + writeLock.lock(); + // get labels + this.accessibleLabels = + csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = + csContext.getConfiguration().getDefaultNodeLabelExpression( + getQueuePath()); + + // inherit from parent if labels not set + if (this.accessibleLabels == null && parent != null) { + this.accessibleLabels = parent.getAccessibleNodeLabels(); + } - // After we setup labels, we can setup capacities - setupConfigurableCapacities(); - - this.maximumAllocation = - csContext.getConfiguration().getMaximumAllocationPerQueue( - getQueuePath()); - - authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - - this.state = csContext.getConfiguration().getState(getQueuePath()); - this.acls = csContext.getConfiguration().getAcls(getQueuePath()); + // inherit from parent if labels not set + if (this.defaultLabelExpression == null && parent != null + && this.accessibleLabels.containsAll( + parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); - - // Check if labels of this queue is a subset of parent queue, only do this - // when we not root - if (parent != null && parent.getParent() != null) { - if (parent.getAccessibleNodeLabels() != null - && !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - // if parent isn't "*", child shouldn't be "*" too - if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - throw new IOException("Parent's accessible queue is not ANY(*), " - + "but child's accessible queue is *"); - } else { - Set<String> diff = - Sets.difference(this.getAccessibleNodeLabels(), - parent.getAccessibleNodeLabels()); - if (!diff.isEmpty()) { - throw new IOException("Some labels of child queue is not a subset " - + "of parent queue, these labels=[" - + StringUtils.join(diff, ",") + "]"); + // After we setup labels, we can setup capacities + setupConfigurableCapacities(); + + this.maximumAllocation = + csContext.getConfiguration().getMaximumAllocationPerQueue( + getQueuePath()); + + authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); + + this.state = csContext.getConfiguration().getState(getQueuePath()); + this.acls = csContext.getConfiguration().getAcls(getQueuePath()); + + // Update metrics + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); + + // Check if labels of this queue is a subset of parent queue, only do this + // when we not root + if (parent != null && parent.getParent() != null) { + if (parent.getAccessibleNodeLabels() != null && !parent + .getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + // if parent isn't "*", child shouldn't be "*" too + if (this.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY)) { + throw new IOException("Parent's accessible queue is not ANY(*), " + + "but child's accessible queue is *"); + } else{ + Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(), + parent.getAccessibleNodeLabels()); + if (!diff.isEmpty()) { + throw new IOException( + "Some labels of child queue is not a subset " + + "of parent queue, these labels=[" + StringUtils + .join(diff, ",") + "]"); + } } } } - } - this.reservationsContinueLooking = csContext.getConfiguration() - .getReservationContinueLook(); + this.reservationsContinueLooking = + csContext.getConfiguration().getReservationContinueLook(); - this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + } finally { + writeLock.unlock(); + } } - + protected QueueInfo getQueueInfo() { + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setAccessibleNodeLabels(accessibleLabels); @@ -318,8 +342,12 @@ public abstract class AbstractCSQueue implements CSQueue { } public QueueStatistics getQueueStatistics() { - QueueStatistics stats = - recordFactory.newRecordInstance(QueueStatistics.class); + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this + QueueStatistics stats = recordFactory.newRecordInstance( + QueueStatistics.class); stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted()); stats.setNumAppsRunning(getMetrics().getAppsRunning()); stats.setNumAppsPending(getMetrics().getAppsPending()); @@ -351,26 +379,36 @@ public abstract class AbstractCSQueue implements CSQueue { return minimumAllocation; } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - queueUsage.incUsed(nodePartition, resource); + try { + writeLock.lock(); + queueUsage.incUsed(nodePartition, resource); - if (!changeContainerResource) { - ++numContainers; + if (!changeContainerResource) { + ++numContainers; + } + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); + } finally { + writeLock.unlock(); } - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, nodePartition); } - protected synchronized void releaseResource(Resource clusterResource, + protected void releaseResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - queueUsage.decUsed(nodePartition, resource); + try { + writeLock.lock(); + queueUsage.decUsed(nodePartition, resource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, nodePartition); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); - if (!changeContainerResource) { - --numContainers; + if (!changeContainerResource) { + --numContainers; + } + } finally { + writeLock.unlock(); } } @@ -381,7 +419,13 @@ public abstract class AbstractCSQueue implements CSQueue { @Private public Map<AccessType, AccessControlList> getACLs() { - return acls; + try { + readLock.lock(); + return acls; + } finally { + readLock.unlock(); + } + } @Private @@ -464,86 +508,88 @@ public abstract class AbstractCSQueue implements CSQueue { minimumAllocation); } - synchronized boolean canAssignToThisQueue(Resource clusterResource, + boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { - // Get current limited resource: - // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect - // queues' max capacity. - // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect - // queue's max capacity, queue's max capacity on the partition will be - // considered to be 100%. Which is a queue can use all resource in the - // partition. - // Doing this because: for non-exclusive allocation, we make sure there's - // idle resource on the partition, to avoid wastage, such resource will be - // leveraged as much as we can, and preemption policy will reclaim it back - // when partitoned-resource-request comes back. - Resource currentLimitResource = - getCurrentLimitResource(nodePartition, clusterResource, - currentResourceLimits, schedulingMode); - - Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - - // Set headroom for currentResourceLimits: - // When queue is a parent queue: Headroom = limit - used + killable - // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) - Resource usedExceptKillable = nowTotalUsed; - if (null != getChildQueues() && !getChildQueues().isEmpty()) { - usedExceptKillable = Resources.subtract(nowTotalUsed, - getTotalKillableResource(nodePartition)); - } - currentResourceLimits.setHeadroom( - Resources.subtract(currentLimitResource, usedExceptKillable)); - - if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - usedExceptKillable, currentLimitResource)) { - - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && nodePartition.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); + try { + readLock.lock(); + // Get current limited resource: + // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect + // queues' max capacity. + // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect + // queue's max capacity, queue's max capacity on the partition will be + // considered to be 100%. Which is a queue can use all resource in the + // partition. + // Doing this because: for non-exclusive allocation, we make sure there's + // idle resource on the partition, to avoid wastage, such resource will be + // leveraged as much as we can, and preemption policy will reclaim it back + // when partitoned-resource-request comes back. + Resource currentLimitResource = getCurrentLimitResource(nodePartition, + clusterResource, currentResourceLimits, schedulingMode); + + Resource nowTotalUsed = queueUsage.getUsed(nodePartition); + + // Set headroom for currentResourceLimits: + // When queue is a parent queue: Headroom = limit - used + killable + // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) + Resource usedExceptKillable = nowTotalUsed; + if (null != getChildQueues() && !getChildQueues().isEmpty()) { + usedExceptKillable = Resources.subtract(nowTotalUsed, + getTotalKillableResource(nodePartition)); + } + currentResourceLimits.setHeadroom( + Resources.subtract(currentLimitResource, usedExceptKillable)); + + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + usedExceptKillable, currentLimitResource)) { + + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking && nodePartition.equals( + RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan( + resourceCalculator, clusterResource, resourceCouldBeUnreserved, + Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = Resources.subtract( + usedExceptKillable, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "try to use reserved: " + getQueueName() + " usedResources: " + + queueUsage.getUsed() + ", clusterResources: " + + clusterResource + ", reservedResources: " + + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + return true; } - return true; } + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + "Check assign to queue, nodePartition=" + + nodePartition + " usedResources: " + queueUsage + .getUsed(nodePartition) + " clusterResources: " + clusterResource + + " currentUsedCapacity " + Resources + .divide(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), labelManager + .getResourceByLabel(nodePartition, clusterResource)) + + " max-capacity: " + queueCapacities + .getAbsoluteMaximumCapacity(nodePartition) + ")"); + } + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, nodePartition=" - + nodePartition - + " usedResources: " - + queueUsage.getUsed(nodePartition) - + " clusterResources: " - + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), - labelManager.getResourceByLabel(nodePartition, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); - } - return false; + return true; + } finally { + readLock.unlock(); } - return true; + } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org