http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 6129772..eecd4ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2227,6 +2227,22 @@ public class LeafQueue extends AbstractCSQueue { } } + public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app, + Priority newAppPriority) { + try { + writeLock.lock(); + FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); + getOrderingPolicy().removeSchedulableEntity(attempt); + + // Update new priority in SchedulerApplication + attempt.setPriority(newAppPriority); + + getOrderingPolicy().addSchedulableEntity(attempt); + } finally { + writeLock.unlock(); + } + } + public OrderingPolicy<FiCaSchedulerApp> getPendingAppsOrderingPolicy() { return pendingOrderingPolicy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index fd43e74..aa7ad50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -666,6 +667,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } finally { writeLock.unlock(); } + } + public ReentrantReadWriteLock.WriteLock getWriteLock() { + return this.writeLock; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/31f8da22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 920052f..8daf0f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -186,10 +186,13 @@ public class FairScheduler extends // an app can be reserved on protected boolean sizeBasedWeight; // Give larger weights to larger jobs - protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not - protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling + // Continuous Scheduling enabled or not + protected boolean continuousSchedulingEnabled; + // Sleep time for each pass in continuous scheduling + protected volatile int continuousSchedulingSleepMs; + // Node available resource comparator private Comparator<FSSchedulerNode> nodeAvailableResourceComparator = - new NodeAvailableResourceComparator(); // Node available resource comparator + new NodeAvailableResourceComparator(); protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double rackLocalityThreshold; // Cluster threshold for rack locality protected long nodeLocalityDelayMs; // Delay for node locality @@ -338,36 +341,40 @@ public class FairScheduler extends * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. */ - protected synchronized void update() { - long start = getClock().getTime(); - updateStarvationStats(); // Determine if any queues merit preemption + protected void update() { + try { + writeLock.lock(); + long start = getClock().getTime(); + updateStarvationStats(); // Determine if any queues merit preemption - FSQueue rootQueue = queueMgr.getRootQueue(); + FSQueue rootQueue = queueMgr.getRootQueue(); - // Recursively update demands for all queues - rootQueue.updateDemand(); + // Recursively update demands for all queues + rootQueue.updateDemand(); - Resource clusterResource = getClusterResource(); - rootQueue.setFairShare(clusterResource); - // Recursively compute fair shares for all queues - // and update metrics - rootQueue.recomputeShares(); - updateRootQueueMetrics(); + Resource clusterResource = getClusterResource(); + rootQueue.setFairShare(clusterResource); + // Recursively compute fair shares for all queues + // and update metrics + rootQueue.recomputeShares(); + updateRootQueueMetrics(); - if (LOG.isDebugEnabled()) { - if (--updatesToSkipForDebug < 0) { - updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; - LOG.debug("Cluster Capacity: " + clusterResource + - " Allocations: " + rootMetrics.getAllocatedResources() + - " Availability: " + Resource.newInstance( - rootMetrics.getAvailableMB(), - rootMetrics.getAvailableVirtualCores()) + - " Demand: " + rootQueue.getDemand()); + if (LOG.isDebugEnabled()) { + if (--updatesToSkipForDebug < 0) { + updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; + LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: " + + rootMetrics.getAllocatedResources() + " Availability: " + + Resource.newInstance(rootMetrics.getAvailableMB(), + rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue + .getDemand()); + } } - } - long duration = getClock().getTime() - start; - fsOpDurations.addUpdateCallDuration(duration); + long duration = getClock().getTime() - start; + fsOpDurations.addUpdateCallDuration(duration); + } finally { + writeLock.unlock(); + } } /** @@ -389,23 +396,28 @@ public class FairScheduler extends * such queues exist, compute how many tasks of each type need to be preempted * and then select the right ones using preemptTasks. */ - protected synchronized void preemptTasksIfNecessary() { - if (!shouldAttemptPreemption()) { - return; - } + protected void preemptTasksIfNecessary() { + try { + writeLock.lock(); + if (!shouldAttemptPreemption()) { + return; + } - long curTime = getClock().getTime(); - if (curTime - lastPreemptCheckTime < preemptionInterval) { - return; - } - lastPreemptCheckTime = curTime; + long curTime = getClock().getTime(); + if (curTime - lastPreemptCheckTime < preemptionInterval) { + return; + } + lastPreemptCheckTime = curTime; - Resource resToPreempt = Resources.clone(Resources.none()); - for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resourceDeficit(sched, curTime)); - } - if (isResourceGreaterThanNone(resToPreempt)) { - preemptResources(resToPreempt); + Resource resToPreempt = Resources.clone(Resources.none()); + for (FSLeafQueue sched : queueMgr.getLeafQueues()) { + Resources.addTo(resToPreempt, resourceDeficit(sched, curTime)); + } + if (isResourceGreaterThanNone(resToPreempt)) { + preemptResources(resToPreempt); + } + } finally { + writeLock.unlock(); } } @@ -549,22 +561,27 @@ public class FairScheduler extends return deficit; } - public synchronized RMContainerTokenSecretManager + public RMContainerTokenSecretManager getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } - // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(FSAppAttempt app) { - double weight = 1.0; - if (sizeBasedWeight) { - // Set weight based on current memory demand - weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2); + public ResourceWeights getAppWeight(FSAppAttempt app) { + try { + readLock.lock(); + double weight = 1.0; + if (sizeBasedWeight) { + // Set weight based on current memory demand + weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2); + } + weight *= app.getPriority().getPriority(); + ResourceWeights resourceWeights = app.getResourceWeights(); + resourceWeights.setWeight((float) weight); + return resourceWeights; + } finally { + readLock.unlock(); } - weight *= app.getPriority().getPriority(); - ResourceWeights resourceWeights = app.getResourceWeights(); - resourceWeights.setWeight((float)weight); - return resourceWeights; + } public Resource getIncrementResourceCapability() { @@ -595,7 +612,7 @@ public class FairScheduler extends return continuousSchedulingEnabled; } - public synchronized int getContinuousSchedulingSleepMs() { + public int getContinuousSchedulingSleepMs() { return continuousSchedulingSleepMs; } @@ -617,114 +634,123 @@ public class FairScheduler extends * user. This will accept a new app even if the user or queue is above * configured limits, but the app will not be marked as runnable. */ - protected synchronized void addApplication(ApplicationId applicationId, + protected void addApplication(ApplicationId applicationId, String queueName, String user, boolean isAppRecovering) { if (queueName == null || queueName.isEmpty()) { - String message = "Reject application " + applicationId + - " submitted by user " + user + " with an empty queue name."; + String message = + "Reject application " + applicationId + " submitted by user " + user + + " with an empty queue name."; LOG.info(message); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); + rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); return; } if (queueName.startsWith(".") || queueName.endsWith(".")) { - String message = "Reject application " + applicationId - + " submitted by user " + user + " with an illegal queue name " - + queueName + ". " - + "The queue name cannot start/end with period."; + String message = + "Reject application " + applicationId + " submitted by user " + user + + " with an illegal queue name " + queueName + ". " + + "The queue name cannot start/end with period."; LOG.info(message); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); + rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); return; } - RMApp rmApp = rmContext.getRMApps().get(applicationId); - FSLeafQueue queue = assignToQueue(rmApp, queueName, user); - if (queue == null) { - return; - } + try { + writeLock.lock(); + RMApp rmApp = rmContext.getRMApps().get(applicationId); + FSLeafQueue queue = assignToQueue(rmApp, queueName, user); + if (queue == null) { + return; + } - // Enforce ACLs - UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); + // Enforce ACLs + UserGroupInformation userUgi = UserGroupInformation.createRemoteUser( + user); + + if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) && !queue + .hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { + String msg = "User " + userUgi.getUserName() + + " cannot submit applications to queue " + queue.getName() + + "(requested queuename is " + queueName + ")"; + LOG.info(msg); + rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, msg)); + return; + } - if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi) - && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) { - String msg = "User " + userUgi.getUserName() + - " cannot submit applications to queue " + queue.getName() + - "(requested queuename is " + queueName + ")"; - LOG.info(msg); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, msg)); - return; - } - - SchedulerApplication<FSAppAttempt> application = - new SchedulerApplication<FSAppAttempt>(queue, user); - applications.put(applicationId, application); - queue.getMetrics().submitApp(user); - - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queue.getName() - + ", currently num of applications: " + applications.size()); - if (isAppRecovering) { - if (LOG.isDebugEnabled()) { - LOG.debug(applicationId - + " is recovering. Skip notifying APP_ACCEPTED"); + SchedulerApplication<FSAppAttempt> application = + new SchedulerApplication<FSAppAttempt>(queue, user); + applications.put(applicationId, application); + queue.getMetrics().submitApp(user); + + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queue.getName() + + ", currently num of applications: " + applications.size()); + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else{ + rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } - } else { - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } finally { + writeLock.unlock(); } } /** * Add a new application attempt to the scheduler. */ - protected synchronized void addApplicationAttempt( + protected void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication<FSAppAttempt> application = - applications.get(applicationAttemptId.getApplicationId()); - String user = application.getUser(); - FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - - FSAppAttempt attempt = - new FSAppAttempt(this, applicationAttemptId, user, - queue, new ActiveUsersManager(getRootQueueMetrics()), - rmContext); - if (transferStateFromPreviousAttempt) { - attempt.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); - } - application.setCurrentAppAttempt(attempt); - - boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); - queue.addApp(attempt, runnable); - if (runnable) { - maxRunningEnforcer.trackRunnableApp(attempt); - } else { - maxRunningEnforcer.trackNonRunnableApp(attempt); - } - - queue.getMetrics().submitAppAttempt(user); + try { + writeLock.lock(); + SchedulerApplication<FSAppAttempt> application = applications.get( + applicationAttemptId.getApplicationId()); + String user = application.getUser(); + FSLeafQueue queue = (FSLeafQueue) application.getQueue(); + + FSAppAttempt attempt = new FSAppAttempt(this, applicationAttemptId, user, + queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); + if (transferStateFromPreviousAttempt) { + attempt.transferStateFromPreviousAttempt( + application.getCurrentAppAttempt()); + } + application.setCurrentAppAttempt(attempt); + + boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user); + queue.addApp(attempt, runnable); + if (runnable) { + maxRunningEnforcer.trackRunnableApp(attempt); + } else{ + maxRunningEnforcer.trackNonRunnableApp(attempt); + } - LOG.info("Added Application Attempt " + applicationAttemptId - + " to scheduler from user: " + user); + queue.getMetrics().submitAppAttempt(user); - if (isAttemptRecovering) { - if (LOG.isDebugEnabled()) { - LOG.debug(applicationAttemptId - + " is recovering. Skipping notifying ATTEMPT_ADDED"); + LOG.info("Added Application Attempt " + applicationAttemptId + + " to scheduler from user: " + user); + + if (isAttemptRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); + } + } else{ + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } - } else { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + } finally { + writeLock.unlock(); } } @@ -770,70 +796,71 @@ public class FairScheduler extends return queue; } - private synchronized void removeApplication(ApplicationId applicationId, + private void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication<FSAppAttempt> application = - applications.get(applicationId); - if (application == null){ + SchedulerApplication<FSAppAttempt> application = applications.remove( + applicationId); + if (application == null) { LOG.warn("Couldn't find application " + applicationId); - return; + } else{ + application.stop(finalState); } - application.stop(finalState); - applications.remove(applicationId); } - private synchronized void removeApplicationAttempt( + private void removeApplicationAttempt( ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { - LOG.info("Application " + applicationAttemptId + " is done." + - " finalState=" + rmAppAttemptFinalState); - SchedulerApplication<FSAppAttempt> application = - applications.get(applicationAttemptId.getApplicationId()); - FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); - - if (attempt == null || application == null) { - LOG.info("Unknown application " + applicationAttemptId + " has completed!"); - return; - } - - // Release all the running containers - for (RMContainer rmContainer : attempt.getLiveContainers()) { - if (keepContainers - && rmContainer.getState().equals(RMContainerState.RUNNING)) { - // do not kill the running container in the case of work-preserving AM - // restart. - LOG.info("Skip killing " + rmContainer.getContainerId()); - continue; + try { + writeLock.lock(); + LOG.info( + "Application " + applicationAttemptId + " is done." + " finalState=" + + rmAppAttemptFinalState); + FSAppAttempt attempt = getApplicationAttempt(applicationAttemptId); + + if (attempt == null) { + LOG.info( + "Unknown application " + applicationAttemptId + " has completed!"); + return; } - super.completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); - } - // Release all reserved containers - for (RMContainer rmContainer : attempt.getReservedContainers()) { - super.completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - rmContainer.getContainerId(), - "Application Complete"), - RMContainerEventType.KILL); - } - // Clean up pending requests, metrics etc. - attempt.stop(rmAppAttemptFinalState); - - // Inform the queue - FSLeafQueue queue = queueMgr.getLeafQueue(attempt.getQueue() - .getQueueName(), false); - boolean wasRunnable = queue.removeApp(attempt); + // Release all the running containers + for (RMContainer rmContainer : attempt.getLiveContainers()) { + if (keepContainers && rmContainer.getState().equals( + RMContainerState.RUNNING)) { + // do not kill the running container in the case of work-preserving AM + // restart. + LOG.info("Skip killing " + rmContainer.getContainerId()); + continue; + } + super.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); + } - if (wasRunnable) { - maxRunningEnforcer.untrackRunnableApp(attempt); - maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, - attempt.getQueue()); - } else { - maxRunningEnforcer.untrackNonRunnableApp(attempt); + // Release all reserved containers + for (RMContainer rmContainer : attempt.getReservedContainers()) { + super.completedContainer(rmContainer, SchedulerUtils + .createAbnormalContainerStatus(rmContainer.getContainerId(), + "Application Complete"), RMContainerEventType.KILL); + } + // Clean up pending requests, metrics etc. + attempt.stop(rmAppAttemptFinalState); + + // Inform the queue + FSLeafQueue queue = queueMgr.getLeafQueue( + attempt.getQueue().getQueueName(), false); + boolean wasRunnable = queue.removeApp(attempt); + + if (wasRunnable) { + maxRunningEnforcer.untrackRunnableApp(attempt); + maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, + attempt.getQueue()); + } else{ + maxRunningEnforcer.untrackNonRunnableApp(attempt); + } + } finally { + writeLock.unlock(); } } @@ -841,97 +868,108 @@ public class FairScheduler extends * Clean up a completed container. */ @Override - protected synchronized void completedContainerInternal( + protected void completedContainerInternal( RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { + try { + writeLock.lock(); + Container container = rmContainer.getContainer(); + + // Get the application for the finished container + FSAppAttempt application = getCurrentAttemptForContainer( + container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); + if (application == null) { + LOG.info( + "Container " + container + " of" + " finished application " + appId + + " completed with event " + event); + return; + } - Container container = rmContainer.getContainer(); - - // Get the application for the finished container - FSAppAttempt application = - getCurrentAttemptForContainer(container.getId()); - ApplicationId appId = - container.getId().getApplicationAttemptId().getApplicationId(); - if (application == null) { - LOG.info("Container " + container + " of" + - " finished application " + appId + - " completed with event " + event); - return; - } - - // Get the node on which the container was allocated - FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); + // Get the node on which the container was allocated + FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); - if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(rmContainer.getReservedSchedulerKey(), node); - } else { - application.containerCompleted(rmContainer, containerStatus, event); - node.releaseContainer(container); - updateRootQueueMetrics(); - } + if (rmContainer.getState() == RMContainerState.RESERVED) { + application.unreserve(rmContainer.getReservedSchedulerKey(), node); + } else{ + application.containerCompleted(rmContainer, containerStatus, event); + node.releaseContainer(container); + updateRootQueueMetrics(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Application attempt " + application.getApplicationAttemptId() - + " released container " + container.getId() + " on node: " + node - + " with event: " + event); + if (LOG.isDebugEnabled()) { + LOG.debug("Application attempt " + application.getApplicationAttemptId() + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); + } + } finally { + writeLock.unlock(); } } - private synchronized void addNode(List<NMContainerStatus> containerReports, + private void addNode(List<NMContainerStatus> containerReports, RMNode node) { - FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName); - nodeTracker.addNode(schedulerNode); + try { + writeLock.lock(); + FSSchedulerNode schedulerNode = new FSSchedulerNode(node, + usePortForNodeName); + nodeTracker.addNode(schedulerNode); - triggerUpdate(); + triggerUpdate(); - Resource clusterResource = getClusterResource(); - queueMgr.getRootQueue().setSteadyFairShare(clusterResource); - queueMgr.getRootQueue().recomputeSteadyShares(); - LOG.info("Added node " + node.getNodeAddress() + - " cluster capacity: " + clusterResource); + Resource clusterResource = getClusterResource(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().recomputeSteadyShares(); + LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: " + + clusterResource); - recoverContainersOnNode(containerReports, node); - updateRootQueueMetrics(); + recoverContainersOnNode(containerReports, node); + updateRootQueueMetrics(); + } finally { + writeLock.unlock(); + } } - private synchronized void removeNode(RMNode rmNode) { - NodeId nodeId = rmNode.getNodeID(); - FSSchedulerNode node = nodeTracker.getNode(nodeId); - if (node == null) { - LOG.error("Attempting to remove non-existent node " + nodeId); - return; - } + private void removeNode(RMNode rmNode) { + try { + writeLock.lock(); + NodeId nodeId = rmNode.getNodeID(); + FSSchedulerNode node = nodeTracker.getNode(nodeId); + if (node == null) { + LOG.error("Attempting to remove non-existent node " + nodeId); + return; + } - // Remove running containers - List<RMContainer> runningContainers = - node.getCopiedListOfRunningContainers(); - for (RMContainer container : runningContainers) { - super.completedContainer(container, - SchedulerUtils.createAbnormalContainerStatus( - container.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } + // Remove running containers + List<RMContainer> runningContainers = + node.getCopiedListOfRunningContainers(); + for (RMContainer container : runningContainers) { + super.completedContainer(container, SchedulerUtils + .createAbnormalContainerStatus(container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + } - // Remove reservations, if any - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - super.completedContainer(reservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - reservedContainer.getContainerId(), - SchedulerUtils.LOST_CONTAINER), - RMContainerEventType.KILL); - } + // Remove reservations, if any + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + super.completedContainer(reservedContainer, SchedulerUtils + .createAbnormalContainerStatus(reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); + } - nodeTracker.removeNode(nodeId); - Resource clusterResource = getClusterResource(); - queueMgr.getRootQueue().setSteadyFairShare(clusterResource); - queueMgr.getRootQueue().recomputeSteadyShares(); - updateRootQueueMetrics(); - triggerUpdate(); + nodeTracker.removeNode(nodeId); + Resource clusterResource = getClusterResource(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().recomputeSteadyShares(); + updateRootQueueMetrics(); + triggerUpdate(); - LOG.info("Removed node " + rmNode.getNodeAddress() + - " cluster capacity: " + clusterResource); + LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + + clusterResource); + } finally { + writeLock.unlock(); + } } @Override @@ -960,12 +998,13 @@ public class FairScheduler extends // Release containers releaseContainers(release, application); - synchronized (application) { + try { + application.getWriteLock().lock(); if (!ask.isEmpty()) { if (LOG.isDebugEnabled()) { - LOG.debug("allocate: pre-update" + - " applicationAttemptId=" + appAttemptId + - " application=" + application.getApplicationId()); + LOG.debug( + "allocate: pre-update" + " applicationAttemptId=" + appAttemptId + + " application=" + application.getApplicationId()); } application.showRequests(); @@ -974,98 +1013,107 @@ public class FairScheduler extends application.showRequests(); } + } finally { + application.getWriteLock().unlock(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: post-update" + - " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size() + - " reservation= " + application.getCurrentReservation()); - - LOG.debug("Preempting " + application.getPreemptionContainers().size() - + " container(s)"); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "allocate: post-update" + " applicationAttemptId=" + appAttemptId + + " #ask=" + ask.size() + " reservation= " + application + .getCurrentReservation()); - Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>(); - for (RMContainer container : application.getPreemptionContainers()) { - preemptionContainerIds.add(container.getContainerId()); - } + LOG.debug("Preempting " + application.getPreemptionContainers().size() + + " container(s)"); + } - application.updateBlacklist(blacklistAdditions, blacklistRemovals); + Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>(); + for (RMContainer container : application.getPreemptionContainers()) { + preemptionContainerIds.add(container.getContainerId()); + } - List<Container> newlyAllocatedContainers = - application.pullNewlyAllocatedContainers(); - // Record container allocation time - if (!(newlyAllocatedContainers.isEmpty())) { - application.recordContainerAllocationTime(getClock().getTime()); - } + application.updateBlacklist(blacklistAdditions, blacklistRemovals); - Resource headroom = application.getHeadroom(); - application.setApplicationHeadroomForMetrics(headroom); - return new Allocation(newlyAllocatedContainers, headroom, - preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); + List<Container> newlyAllocatedContainers = + application.pullNewlyAllocatedContainers(); + // Record container allocation time + if (!(newlyAllocatedContainers.isEmpty())) { + application.recordContainerAllocationTime(getClock().getTime()); } + + Resource headroom = application.getHeadroom(); + application.setApplicationHeadroomForMetrics(headroom); + return new Allocation(newlyAllocatedContainers, headroom, + preemptionContainerIds, null, null, + application.pullUpdatedNMTokens()); } /** * Process a heartbeat update from a node. */ - private synchronized void nodeUpdate(RMNode nm) { - long start = getClock().getTime(); - if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + - " cluster capacity: " + getClusterResource()); - } - eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); - - List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); - List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); - List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); - for(UpdatedContainerInfo containerInfo : containerInfoList) { - newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); - completedContainers.addAll(containerInfo.getCompletedContainers()); - } - // Processing the newly launched containers - for (ContainerStatus launchedContainer : newlyLaunchedContainers) { - containerLaunchedOnNode(launchedContainer.getContainerId(), node); - } - - // Process completed containers - for (ContainerStatus completedContainer : completedContainers) { - ContainerId containerId = completedContainer.getContainerId(); - LOG.debug("Container FINISHED: " + containerId); - super.completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); - } + private void nodeUpdate(RMNode nm) { + try { + writeLock.lock(); + long start = getClock().getTime(); + if (LOG.isDebugEnabled()) { + LOG.debug( + "nodeUpdate: " + nm + " cluster capacity: " + getClusterResource()); + } + eventLog.log("HEARTBEAT", nm.getHostName()); + FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); + + List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); + List<ContainerStatus> newlyLaunchedContainers = + new ArrayList<ContainerStatus>(); + List<ContainerStatus> completedContainers = + new ArrayList<ContainerStatus>(); + for (UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll( + containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } - // If the node is decommissioning, send an update to have the total - // resource equal to the used resource, so no available resource to - // schedule. - if (nm.getState() == NodeState.DECOMMISSIONING) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(getSchedulerNode(nm.getNodeID()) - .getAllocatedResource(), 0))); - } + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.debug("Container FINISHED: " + containerId); + super.completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); + } - if (continuousSchedulingEnabled) { - if (!completedContainers.isEmpty()) { + // If the node is decommissioning, send an update to have the total + // resource equal to the used resource, so no available resource to + // schedule. + if (nm.getState() == NodeState.DECOMMISSIONING) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption + .newInstance( + getSchedulerNode(nm.getNodeID()).getAllocatedResource(), + 0))); + } + + if (continuousSchedulingEnabled) { + if (!completedContainers.isEmpty()) { + attemptScheduling(node); + } + } else{ attemptScheduling(node); } - } else { - attemptScheduling(node); - } - // Updating node resource utilization - node.setAggregatedContainersUtilization( - nm.getAggregatedContainersUtilization()); - node.setNodeUtilization(nm.getNodeUtilization()); + // Updating node resource utilization + node.setAggregatedContainersUtilization( + nm.getAggregatedContainersUtilization()); + node.setNodeUtilization(nm.getNodeUtilization()); - long duration = getClock().getTime() - start; - fsOpDurations.addNodeUpdateDuration(duration); + long duration = getClock().getTime() - start; + fsOpDurations.addNodeUpdateDuration(duration); + } finally { + writeLock.unlock(); + } } void continuousSchedulingAttempt() throws InterruptedException { @@ -1126,52 +1174,59 @@ public class FairScheduler extends } @VisibleForTesting - synchronized void attemptScheduling(FSSchedulerNode node) { - if (rmContext.isWorkPreservingRecoveryEnabled() - && !rmContext.isSchedulerReadyForAllocatingContainers()) { - return; - } + void attemptScheduling(FSSchedulerNode node) { + try { + writeLock.lock(); + if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext + .isSchedulerReadyForAllocatingContainers()) { + return; + } - final NodeId nodeID = node.getNodeID(); - if (!nodeTracker.exists(nodeID)) { - // The node might have just been removed while this thread was waiting - // on the synchronized lock before it entered this synchronized method - LOG.info("Skipping scheduling as the node " + nodeID + - " has been removed"); - return; - } + final NodeId nodeID = node.getNodeID(); + if (!nodeTracker.exists(nodeID)) { + // The node might have just been removed while this thread was waiting + // on the synchronized lock before it entered this synchronized method + LOG.info( + "Skipping scheduling as the node " + nodeID + " has been removed"); + return; + } - // Assign new containers... - // 1. Check for reserved applications - // 2. Schedule if there are no reservations - - boolean validReservation = false; - FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); - if (reservedAppSchedulable != null) { - validReservation = reservedAppSchedulable.assignReservedContainer(node); - } - if (!validReservation) { - // No reservation, schedule at queue which is farthest below fair share - int assignedContainers = 0; - Resource assignedResource = Resources.clone(Resources.none()); - Resource maxResourcesToAssign = - Resources.multiply(node.getUnallocatedResource(), 0.5f); - while (node.getReservedContainer() == null) { - boolean assignedContainer = false; - Resource assignment = queueMgr.getRootQueue().assignContainer(node); - if (!assignment.equals(Resources.none())) { - assignedContainers++; - assignedContainer = true; - Resources.addTo(assignedResource, assignment); - } - if (!assignedContainer) { break; } - if (!shouldContinueAssigning(assignedContainers, - maxResourcesToAssign, assignedResource)) { - break; + // Assign new containers... + // 1. Check for reserved applications + // 2. Schedule if there are no reservations + + boolean validReservation = false; + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + validReservation = reservedAppSchedulable.assignReservedContainer(node); + } + if (!validReservation) { + // No reservation, schedule at queue which is farthest below fair share + int assignedContainers = 0; + Resource assignedResource = Resources.clone(Resources.none()); + Resource maxResourcesToAssign = Resources.multiply( + node.getUnallocatedResource(), 0.5f); + while (node.getReservedContainer() == null) { + boolean assignedContainer = false; + Resource assignment = queueMgr.getRootQueue().assignContainer(node); + if (!assignment.equals(Resources.none())) { + assignedContainers++; + assignedContainer = true; + Resources.addTo(assignedResource, assignment); + } + if (!assignedContainer) { + break; + } + if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign, + assignedResource)) { + break; + } } } + updateRootQueueMetrics(); + } finally { + writeLock.unlock(); } - updateRootQueueMetrics(); } public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { @@ -1314,51 +1369,55 @@ public class FairScheduler extends } } - private synchronized String resolveReservationQueueName(String queueName, + private String resolveReservationQueueName(String queueName, ApplicationId applicationId, ReservationId reservationID, boolean isRecovering) { - FSQueue queue = queueMgr.getQueue(queueName); - if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { - return queueName; - } - // Use fully specified name from now on (including root. prefix) - queueName = queue.getQueueName(); - if (reservationID != null) { - String resQName = queueName + "." + reservationID.toString(); - queue = queueMgr.getQueue(resQName); - if (queue == null) { - // reservation has terminated during failover - if (isRecovering && allocConf.getMoveOnExpiry(queueName)) { - // move to the default child queue of the plan - return getDefaultQueueForPlanQueue(queueName); + try { + readLock.lock(); + FSQueue queue = queueMgr.getQueue(queueName); + if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { + return queueName; + } + // Use fully specified name from now on (including root. prefix) + queueName = queue.getQueueName(); + if (reservationID != null) { + String resQName = queueName + "." + reservationID.toString(); + queue = queueMgr.getQueue(resQName); + if (queue == null) { + // reservation has terminated during failover + if (isRecovering && allocConf.getMoveOnExpiry(queueName)) { + // move to the default child queue of the plan + return getDefaultQueueForPlanQueue(queueName); + } + String message = "Application " + applicationId + + " submitted to a reservation which is not yet " + + "currently active: " + resQName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return null; } - String message = - "Application " - + applicationId - + " submitted to a reservation which is not yet currently active: " - + resQName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return null; - } - if (!queue.getParent().getQueueName().equals(queueName)) { - String message = - "Application: " + applicationId + " submitted to a reservation " - + resQName + " which does not belong to the specified queue: " - + queueName; - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, - RMAppEventType.APP_REJECTED, message)); - return null; - } - // use the reservation queue to run the app - queueName = resQName; - } else { - // use the default child queue of the plan for unreserved apps - queueName = getDefaultQueueForPlanQueue(queueName); + if (!queue.getParent().getQueueName().equals(queueName)) { + String message = + "Application: " + applicationId + " submitted to a reservation " + + resQName + " which does not belong to the specified queue: " + + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return null; + } + // use the reservation queue to run the app + queueName = resQName; + } else{ + // use the default child queue of the plan for unreserved apps + queueName = getDefaultQueueForPlanQueue(queueName); + } + return queueName; + } finally { + readLock.unlock(); } - return queueName; + } private String getDefaultQueueForPlanQueue(String queueName) { @@ -1372,12 +1431,13 @@ public class FairScheduler extends // NOT IMPLEMENTED } - public synchronized void setRMContext(RMContext rmContext) { + public void setRMContext(RMContext rmContext) { this.rmContext = rmContext; } private void initScheduler(Configuration conf) throws IOException { - synchronized (this) { + try { + writeLock.lock(); this.conf = new FairSchedulerConfiguration(conf); validateConf(this.conf); minimumAllocation = this.conf.getMinimumAllocation(); @@ -1385,8 +1445,7 @@ public class FairScheduler extends incrAllocation = this.conf.getIncrementAllocation(); updateReservationThreshold(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); + continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); rackLocalityThreshold = this.conf.getLocalityThresholdRack(); nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); @@ -1407,8 +1466,8 @@ public class FairScheduler extends if (updateInterval < 0) { updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS - + " is invalid, so using default value " + - +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + + " is invalid, so using default value " + + +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + " ms instead"); } @@ -1416,8 +1475,7 @@ public class FairScheduler extends fsOpDurations = FSOpDurations.getInstance(true); // This stores per-application scheduling information - this.applications = new ConcurrentHashMap< - ApplicationId, SchedulerApplication<FSAppAttempt>>(); + this.applications = new ConcurrentHashMap<>(); this.eventLog = new FairSchedulerEventLog(); eventLog.init(this.conf); @@ -1438,6 +1496,8 @@ public class FairScheduler extends schedulingThread.setName("FairSchedulerContinuousScheduling"); schedulingThread.setDaemon(true); } + } finally { + writeLock.unlock(); } allocsLoader.init(conf); @@ -1460,15 +1520,21 @@ public class FairScheduler extends reservationThreshold = newThreshold; } - private synchronized void startSchedulerThreads() { - Preconditions.checkNotNull(updateThread, "updateThread is null"); - Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); - updateThread.start(); - if (continuousSchedulingEnabled) { - Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); - schedulingThread.start(); + private void startSchedulerThreads() { + try { + writeLock.lock(); + Preconditions.checkNotNull(updateThread, "updateThread is null"); + Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); + updateThread.start(); + if (continuousSchedulingEnabled) { + Preconditions.checkNotNull(schedulingThread, + "schedulingThread is null"); + schedulingThread.start(); + } + allocsLoader.start(); + } finally { + writeLock.unlock(); } - allocsLoader.start(); } @Override @@ -1485,7 +1551,8 @@ public class FairScheduler extends @Override public void serviceStop() throws Exception { - synchronized (this) { + try { + writeLock.lock(); if (updateThread != null) { updateThread.interrupt(); updateThread.join(THREAD_JOIN_TIMEOUT_MS); @@ -1499,6 +1566,8 @@ public class FairScheduler extends if (allocsLoader != null) { allocsLoader.stop(); } + } finally { + writeLock.unlock(); } super.serviceStop(); @@ -1542,17 +1611,22 @@ public class FairScheduler extends } @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, + public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { - FSQueue queue = getQueueManager().getQueue(queueName); - if (queue == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("ACL not found for queue access-type " + acl - + " for queue " + queueName); + try { + readLock.lock(); + FSQueue queue = getQueueManager().getQueue(queueName); + if (queue == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("ACL not found for queue access-type " + acl + " for queue " + + queueName); + } + return false; } - return false; + return queue.hasAccess(acl, callerUGI); + } finally { + readLock.unlock(); } - return queue.hasAccess(acl, callerUGI); } public AllocationConfiguration getAllocationConfiguration() { @@ -1566,12 +1640,16 @@ public class FairScheduler extends public void onReload(AllocationConfiguration queueInfo) { // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. - synchronized (FairScheduler.this) { + + writeLock.lock(); + try { allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource()); queueMgr.updateAllocationConfiguration(allocConf); applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); + } finally { + writeLock.unlock(); } } } @@ -1616,32 +1694,41 @@ public class FairScheduler extends } @Override - public synchronized String moveApplication(ApplicationId appId, + public String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication<FSAppAttempt> app = applications.get(appId); - if (app == null) { - throw new YarnException("App to be moved " + appId + " not found."); - } - FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); - // To serialize with FairScheduler#allocate, synchronize on app attempt - synchronized (attempt) { - FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); - String destQueueName = handleMoveToPlanQueue(queueName); - FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); - if (targetQueue == null) { - throw new YarnException("Target queue " + queueName - + " not found or is not a leaf queue."); - } - if (targetQueue == oldQueue) { - return oldQueue.getQueueName(); + try { + writeLock.lock(); + SchedulerApplication<FSAppAttempt> app = applications.get(appId); + if (app == null) { + throw new YarnException("App to be moved " + appId + " not found."); } - - if (oldQueue.isRunnableApp(attempt)) { - verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); + FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); + // To serialize with FairScheduler#allocate, synchronize on app attempt + + try { + attempt.getWriteLock().lock(); + FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); + String destQueueName = handleMoveToPlanQueue(queueName); + FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); + if (targetQueue == null) { + throw new YarnException("Target queue " + queueName + + " not found or is not a leaf queue."); + } + if (targetQueue == oldQueue) { + return oldQueue.getQueueName(); + } + + if (oldQueue.isRunnableApp(attempt)) { + verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); + } + + executeMove(app, attempt, oldQueue, targetQueue); + return targetQueue.getQueueName(); + } finally { + attempt.getWriteLock().unlock(); } - - executeMove(app, attempt, oldQueue, targetQueue); - return targetQueue.getQueueName(); + } finally { + writeLock.unlock(); } } @@ -1737,12 +1824,17 @@ public class FairScheduler extends * Process resource update on a node and update Queue. */ @Override - public synchronized void updateNodeResource(RMNode nm, + public void updateNodeResource(RMNode nm, ResourceOption resourceOption) { - super.updateNodeResource(nm, resourceOption); - updateRootQueueMetrics(); - queueMgr.getRootQueue().setSteadyFairShare(getClusterResource()); - queueMgr.getRootQueue().recomputeSteadyShares(); + try { + writeLock.lock(); + super.updateNodeResource(nm, resourceOption); + updateRootQueueMetrics(); + queueMgr.getRootQueue().setSteadyFairShare(getClusterResource()); + queueMgr.getRootQueue().recomputeSteadyShares(); + } finally { + writeLock.unlock(); + } } /** {@inheritDoc} */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
