YARN-3139. Improve locks in AbstractYarnScheduler/CapacityScheduler/FairScheduler. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/caafa980 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/caafa980 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/caafa980 Branch: refs/heads/branch-2 Commit: caafa980af9a19427855df1d4b1d5b7681c3944e Parents: 69c1ab4 Author: Jian He <jia...@apache.org> Authored: Thu Oct 6 07:54:22 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Thu Oct 6 07:55:14 2016 -0700 ---------------------------------------------------------------------- .../server/resourcemanager/RMServerUtils.java | 5 +- .../scheduler/AbstractYarnScheduler.java | 416 +++-- .../scheduler/SchedulerApplicationAttempt.java | 8 +- .../scheduler/capacity/CapacityScheduler.java | 1731 ++++++++++-------- .../scheduler/capacity/LeafQueue.java | 17 + .../scheduler/common/fica/FiCaSchedulerApp.java | 16 +- .../scheduler/fair/FairScheduler.java | 1048 ++++++----- 7 files changed, 1755 insertions(+), 1486 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b90e499..b2a085a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -211,10 +211,7 @@ public class RMServerUtils { } /** - * Validate increase/decrease request. This function must be called under - * the queue lock to make sure that the access to container resource is - * atomic. Refer to LeafQueue.decreaseContainer() and - * CapacityScheduelr.updateIncreaseRequests() + * Validate increase/decrease request. * <pre> * - Throw exception when any other error happens * </pre> http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 45415de..645e06d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -94,7 +93,7 @@ public abstract class AbstractYarnScheduler protected Resource minimumAllocation; - protected RMContext rmContext; + protected volatile RMContext rmContext; private volatile Priority maxClusterLevelAppPriority; @@ -112,6 +111,18 @@ public abstract class AbstractYarnScheduler protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + protected final ReentrantReadWriteLock.ReadLock readLock; + + /* + * Use writeLock for any of operations below: + * - queue change (hierarchy / configuration / container allocation) + * - application(add/remove/allocate-container, but not include container + * finish) + * - node (add/remove/change-resource/container-allocation, but not include + * container finish) + */ + protected final ReentrantReadWriteLock.WriteLock writeLock; + /** * Construct the service. * @@ -119,6 +130,9 @@ public abstract class AbstractYarnScheduler */ public AbstractYarnScheduler(String name) { super(name); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); } @Override @@ -141,6 +155,10 @@ public abstract class AbstractYarnScheduler return nodeTracker; } + /* + * YARN-3136 removed synchronized lock for this method for performance + * purposes + */ public List<Container> getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); @@ -155,9 +173,8 @@ public abstract class AbstractYarnScheduler } Collection<RMContainer> liveContainers = app.getCurrentAppAttempt().getLiveContainers(); - ContainerId amContainerId = - rmContext.getRMApps().get(appId).getCurrentAppAttempt() - .getMasterContainer().getId(); + ContainerId amContainerId = rmContext.getRMApps().get(appId) + .getCurrentAppAttempt().getMasterContainer().getId(); for (RMContainer rmContainer : liveContainers) { if (!rmContainer.getContainerId().equals(amContainerId)) { containerList.add(rmContainer.getContainer()); @@ -211,54 +228,59 @@ public abstract class AbstractYarnScheduler nodeTracker.setConfiguredMaxAllocation(maximumAllocation); } - protected synchronized void containerLaunchedOnNode( + protected void containerLaunchedOnNode( ContainerId containerId, SchedulerNode node) { - // Get the application for the finished container - SchedulerApplicationAttempt application = - getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " + containerId.getApplicationAttemptId() - .getApplicationId() + " launched container " + containerId - + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - return; - } + try { + readLock.lock(); + // Get the application for the finished container + SchedulerApplicationAttempt application = getCurrentAttemptForContainer( + containerId); + if (application == null) { + LOG.info("Unknown application " + containerId.getApplicationAttemptId() + .getApplicationId() + " launched container " + containerId + + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } - application.containerLaunchedOnNode(containerId, node.getNodeID()); + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } finally { + readLock.unlock(); + } } protected void containerIncreasedOnNode(ContainerId containerId, SchedulerNode node, Container increasedContainerReportedByNM) { + /* + * No lock is required, as this method is protected by scheduler's writeLock + */ // Get the application for the finished container - SchedulerApplicationAttempt application = - getCurrentAttemptForContainer(containerId); + SchedulerApplicationAttempt application = getCurrentAttemptForContainer( + containerId); if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " increased container " + containerId + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + LOG.info("Unknown application " + containerId.getApplicationAttemptId() + .getApplicationId() + " increased container " + containerId + + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } - LeafQueue leafQueue = (LeafQueue) application.getQueue(); - synchronized (leafQueue) { - RMContainer rmContainer = getRMContainer(containerId); - if (rmContainer == null) { - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent( - node.getNodeID(), containerId)); - return; - } - rmContainer.handle(new RMContainerNMDoneChangeResourceEvent( - containerId, increasedContainerReportedByNM.getResource())); + + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; } + rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId, + increasedContainerReportedByNM.getResource())); } public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { - SchedulerApplication<T> app = - applications.get(applicationAttemptId.getApplicationId()); + SchedulerApplication<T> app = applications.get( + applicationAttemptId.getApplicationId()); return app == null ? null : app.getCurrentAppAttempt(); } @@ -338,96 +360,101 @@ public abstract class AbstractYarnScheduler } } - public synchronized void recoverContainersOnNode( + public void recoverContainersOnNode( List<NMContainerStatus> containerReports, RMNode nm) { - if (!rmContext.isWorkPreservingRecoveryEnabled() - || containerReports == null - || (containerReports != null && containerReports.isEmpty())) { - return; - } - - for (NMContainerStatus container : containerReports) { - ApplicationId appId = - container.getContainerId().getApplicationAttemptId().getApplicationId(); - RMApp rmApp = rmContext.getRMApps().get(appId); - if (rmApp == null) { - LOG.error("Skip recovering container " + container - + " for unknown application."); - killOrphanContainerOnNode(nm, container); - continue; + try { + writeLock.lock(); + if (!rmContext.isWorkPreservingRecoveryEnabled() + || containerReports == null || (containerReports != null + && containerReports.isEmpty())) { + return; } - SchedulerApplication<T> schedulerApp = applications.get(appId); - if (schedulerApp == null) { - LOG.info("Skip recovering container " + container - + " for unknown SchedulerApplication. Application current state is " - + rmApp.getState()); - killOrphanContainerOnNode(nm, container); - continue; - } + for (NMContainerStatus container : containerReports) { + ApplicationId appId = + container.getContainerId().getApplicationAttemptId() + .getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + container + + " for unknown application."); + killOrphanContainerOnNode(nm, container); + continue; + } - LOG.info("Recovering container " + container); - SchedulerApplicationAttempt schedulerAttempt = - schedulerApp.getCurrentAppAttempt(); - - if (!rmApp.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - // Do not recover containers for stopped attempt or previous attempt. - if (schedulerAttempt.isStopped() - || !schedulerAttempt.getApplicationAttemptId().equals( - container.getContainerId().getApplicationAttemptId())) { - LOG.info("Skip recovering container " + container - + " for already stopped attempt."); + SchedulerApplication<T> schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + container + + " for unknown SchedulerApplication. " + + "Application current state is " + rmApp.getState()); killOrphanContainerOnNode(nm, container); continue; } - } - // create container - RMContainer rmContainer = recoverAndCreateContainer(container, nm); - - // recover RMContainer - rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), - container)); - - // recover scheduler node - SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID()); - schedulerNode.recoverContainer(rmContainer); - - // recover queue: update headroom etc. - Queue queue = schedulerAttempt.getQueue(); - queue.recoverContainer( - getClusterResource(), schedulerAttempt, rmContainer); - - // recover scheduler attempt - schedulerAttempt.recoverContainer(schedulerNode, rmContainer); - - // set master container for the current running AMContainer for this - // attempt. - RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); - if (appAttempt != null) { - Container masterContainer = appAttempt.getMasterContainer(); - - // Mark current running AMContainer's RMContainer based on the master - // container ID stored in AppAttempt. - if (masterContainer != null - && masterContainer.getId().equals(rmContainer.getContainerId())) { - ((RMContainerImpl)rmContainer).setAMContainer(true); + LOG.info("Recovering container " + container); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + if (!rmApp.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + // Do not recover containers for stopped attempt or previous attempt. + if (schedulerAttempt.isStopped() || !schedulerAttempt + .getApplicationAttemptId().equals( + container.getContainerId().getApplicationAttemptId())) { + LOG.info("Skip recovering container " + container + + " for already stopped attempt."); + killOrphanContainerOnNode(nm, container); + continue; + } } - } - synchronized (schedulerAttempt) { - Set<ContainerId> releases = schedulerAttempt.getPendingRelease(); - if (releases.contains(container.getContainerId())) { + // create container + RMContainer rmContainer = recoverAndCreateContainer(container, nm); + + // recover RMContainer + rmContainer.handle( + new RMContainerRecoverEvent(container.getContainerId(), container)); + + // recover scheduler node + SchedulerNode schedulerNode = nodeTracker.getNode(nm.getNodeID()); + schedulerNode.recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(getClusterResource(), schedulerAttempt, + rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(schedulerNode, rmContainer); + + // set master container for the current running AMContainer for this + // attempt. + RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); + if (appAttempt != null) { + Container masterContainer = appAttempt.getMasterContainer(); + + // Mark current running AMContainer's RMContainer based on the master + // container ID stored in AppAttempt. + if (masterContainer != null && masterContainer.getId().equals( + rmContainer.getContainerId())) { + ((RMContainerImpl) rmContainer).setAMContainer(true); + } + } + + if (schedulerAttempt.getPendingRelease().remove( + container.getContainerId())) { // release the container - rmContainer.handle(new RMContainerFinishedEvent(container - .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED)); - releases.remove(container.getContainerId()); + rmContainer.handle( + new RMContainerFinishedEvent(container.getContainerId(), + SchedulerUtils + .createAbnormalContainerStatus(container.getContainerId(), + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); LOG.info(container.getContainerId() + " is released by application."); } } + } finally { + writeLock.unlock(); } } @@ -492,17 +519,15 @@ public abstract class AbstractYarnScheduler for (SchedulerApplication<T> app : applications.values()) { T attempt = app.getCurrentAppAttempt(); if (attempt != null) { - synchronized (attempt) { - for (ContainerId containerId : attempt.getPendingRelease()) { - RMAuditLogger.logFailure(app.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "Scheduler", - "Trying to release container not owned by app " - + "or with invalid id.", attempt.getApplicationId(), - containerId, null); - } - attempt.getPendingRelease().clear(); + for (ContainerId containerId : attempt.getPendingRelease()) { + RMAuditLogger.logFailure(app.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app " + + "or with invalid id.", attempt.getApplicationId(), + containerId, null); } + attempt.getPendingRelease().clear(); } } } @@ -558,9 +583,7 @@ public abstract class AbstractYarnScheduler < nmExpireInterval) { LOG.info(containerId + " doesn't exist. Add the container" + " to the release request cache as it maybe on recovery."); - synchronized (attempt) { - attempt.getPendingRelease().add(containerId); - } + attempt.getPendingRelease().add(containerId); } else { RMAuditLogger.logFailure(attempt.getUser(), AuditConstants.RELEASE_CONTAINER, @@ -603,81 +626,92 @@ public abstract class AbstractYarnScheduler } @Override - public synchronized void moveAllApps(String sourceQueue, String destQueue) + public void moveAllApps(String sourceQueue, String destQueue) throws YarnException { - // check if destination queue is a valid leaf queue try { - getQueueInfo(destQueue, false, false); - } catch (IOException e) { - LOG.warn(e); - throw new YarnException(e); - } - // check if source queue is a valid - List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue); - if (apps == null) { - String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist"; - LOG.warn(errMsg); - throw new YarnException(errMsg); - } - // generate move events for each pending/running app - for (ApplicationAttemptId app : apps) { - SettableFuture<Object> future = SettableFuture.create(); - this.rmContext - .getDispatcher() - .getEventHandler() - .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + writeLock.lock(); + // check if destination queue is a valid leaf queue + try { + getQueueInfo(destQueue, false, false); + } catch (IOException e) { + LOG.warn(e); + throw new YarnException(e); + } + // check if source queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue); + if (apps == null) { + String errMsg = + "The specified Queue: " + sourceQueue + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate move events for each pending/running app + for (ApplicationAttemptId app : apps) { + SettableFuture<Object> future = SettableFuture.create(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + } finally { + writeLock.unlock(); } } @Override - public synchronized void killAllAppsInQueue(String queueName) + public void killAllAppsInQueue(String queueName) throws YarnException { - // check if queue is a valid - List<ApplicationAttemptId> apps = getAppsInQueue(queueName); - if (apps == null) { - String errMsg = "The specified Queue: " + queueName + " doesn't exist"; - LOG.warn(errMsg); - throw new YarnException(errMsg); - } - // generate kill events for each pending/running app - for (ApplicationAttemptId app : apps) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, - "Application killed due to expiry of reservation queue " + - queueName + ".")); + try { + writeLock.lock(); + // check if queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(queueName); + if (apps == null) { + String errMsg = "The specified Queue: " + queueName + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate kill events for each pending/running app + for (ApplicationAttemptId app : apps) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL, + "Application killed due to expiry of reservation queue " + + queueName + ".")); + } + } finally { + writeLock.unlock(); } } /** * Process resource update on a node. */ - public synchronized void updateNodeResource(RMNode nm, + public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { - SchedulerNode node = getSchedulerNode(nm.getNodeID()); - Resource newResource = resourceOption.getResource(); - Resource oldResource = node.getTotalResource(); - if(!oldResource.equals(newResource)) { - // Notify NodeLabelsManager about this change - rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), - newResource); - - // Log resource change - LOG.info("Update resource on node: " + node.getNodeName() - + " from: " + oldResource + ", to: " - + newResource); - - nodeTracker.removeNode(nm.getNodeID()); - - // update resource to node - node.updateTotalResource(newResource); - - nodeTracker.addNode((N) node); - } else { - // Log resource change - LOG.warn("Update resource on node: " + node.getNodeName() - + " with the same resource: " + newResource); + try { + writeLock.lock(); + SchedulerNode node = getSchedulerNode(nm.getNodeID()); + Resource newResource = resourceOption.getResource(); + Resource oldResource = node.getTotalResource(); + if (!oldResource.equals(newResource)) { + // Notify NodeLabelsManager about this change + rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), + newResource); + + // Log resource change + LOG.info("Update resource on node: " + node.getNodeName() + " from: " + + oldResource + ", to: " + newResource); + + nodeTracker.removeNode(nm.getNodeID()); + + // update resource to node + node.updateTotalResource(newResource); + + nodeTracker.addNode((N) node); + } else{ + // Log resource change + LOG.warn("Update resource on node: " + node.getNodeName() + + " with the same resource: " + newResource); + } + } finally { + writeLock.unlock(); } } @@ -735,7 +769,7 @@ public abstract class AbstractYarnScheduler } @Override - public synchronized void setClusterMaxPriority(Configuration conf) + public void setClusterMaxPriority(Configuration conf) throws YarnException { try { maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/caafa980/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index adc3a97..0f4ad11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -178,7 +179,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager, rmContext.getEpoch(), attemptResourceUsage); this.queue = queue; - this.pendingRelease = new HashSet<ContainerId>(); + this.pendingRelease = Collections.newSetFromMap( + new ConcurrentHashMap<ContainerId, Boolean>()); this.attemptId = applicationAttemptId; if (rmContext.getRMApps() != null && rmContext.getRMApps() @@ -1153,6 +1155,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { // queue's resource usage for specific partition } + public ReentrantReadWriteLock.WriteLock getWriteLock() { + return writeLock; + } + @Override public boolean isRecovering() { return isAttemptRecovering; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org