YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan. YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. Contributed by Wangda Tan.
(cherry picked from commit f2ea555ac6c06a3f2f6559731f48711fff05d3f1) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e8e3a362 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e8e3a362 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e8e3a362 Branch: refs/heads/branch-2 Commit: e8e3a36213936f69efdc765103700acdc2ac9d5a Parents: 08eeb3e Author: Vinod Kumar Vavilapalli <vino...@apache.org> Authored: Wed Oct 15 18:33:06 2014 -0700 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Wed Oct 15 18:34:19 2014 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 + .../yarn/sls/scheduler/RMNodeWrapper.java | 5 + hadoop-yarn-project/CHANGES.txt | 6 + .../dev-support/findbugs-exclude.xml | 17 + .../ApplicationMasterService.java | 42 +- .../server/resourcemanager/RMAppManager.java | 32 +- .../yarn/server/resourcemanager/RMContext.java | 5 + .../server/resourcemanager/RMContextImpl.java | 12 + .../server/resourcemanager/RMServerUtils.java | 19 +- .../server/resourcemanager/ResourceManager.java | 11 +- .../CapacitySchedulerPlanFollower.java | 17 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 8 +- .../rmapp/attempt/RMAppAttemptImpl.java | 39 +- .../server/resourcemanager/rmnode/RMNode.java | 8 + .../resourcemanager/rmnode/RMNodeImpl.java | 8 + .../server/resourcemanager/scheduler/Queue.java | 19 + .../scheduler/SchedulerUtils.java | 122 +++- .../scheduler/capacity/AbstractCSQueue.java | 448 ++++++++++++++ .../scheduler/capacity/CSQueue.java | 61 +- .../scheduler/capacity/CSQueueUtils.java | 57 +- .../scheduler/capacity/CapacityScheduler.java | 48 +- .../CapacitySchedulerConfiguration.java | 148 ++++- .../scheduler/capacity/LeafQueue.java | 596 ++++++++++--------- .../scheduler/capacity/ParentQueue.java | 391 +++++------- .../scheduler/capacity/PlanQueue.java | 8 +- .../scheduler/capacity/ReservationQueue.java | 2 +- .../resourcemanager/scheduler/fair/FSQueue.java | 13 + .../scheduler/fifo/FifoScheduler.java | 13 + .../server/resourcemanager/Application.java | 1 + .../yarn/server/resourcemanager/MockAM.java | 35 +- .../yarn/server/resourcemanager/MockNodes.java | 7 +- .../yarn/server/resourcemanager/MockRM.java | 37 +- .../server/resourcemanager/RMHATestBase.java | 5 +- .../server/resourcemanager/TestAppManager.java | 4 +- .../resourcemanager/TestApplicationACLs.java | 3 +- .../resourcemanager/TestClientRMService.java | 54 +- .../yarn/server/resourcemanager/TestRMHA.java | 2 +- .../TestWorkPreservingRMRestart.java | 2 +- .../reservation/ReservationSystemTestUtil.java | 30 + .../rmapp/TestRMAppTransitions.java | 11 +- .../attempt/TestRMAppAttemptTransitions.java | 66 +- .../scheduler/TestSchedulerUtils.java | 279 +++++++-- .../capacity/TestApplicationLimits.java | 11 +- .../scheduler/capacity/TestCSQueueUtils.java | 28 +- .../capacity/TestCapacityScheduler.java | 28 +- .../scheduler/capacity/TestChildQueueOrder.java | 5 +- .../capacity/TestContainerAllocation.java | 460 ++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 37 +- .../scheduler/capacity/TestParentQueue.java | 5 +- .../scheduler/capacity/TestQueueMappings.java | 10 +- .../scheduler/capacity/TestQueueParsing.java | 267 ++++++++- .../capacity/TestReservationQueue.java | 9 +- .../scheduler/capacity/TestReservations.java | 30 +- .../scheduler/capacity/TestUtils.java | 31 +- .../scheduler/fair/FairSchedulerTestBase.java | 2 +- .../scheduler/fair/TestFairScheduler.java | 2 +- .../resourcemanager/webapp/TestRMWebApp.java | 42 +- 57 files changed, 2867 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 029fa87..fdddcf4 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.nodemanager; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -159,6 +160,10 @@ public class NodeInfo { return null; } + @Override + public Set<String> getNodeLabels() { + return null; + } } public static RMNode newNodeInfo(String rackName, String hostName, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 7eca66f..3b185ae 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode import java.util.Collections; import java.util.List; +import java.util.Set; @Private @Unstable @@ -147,4 +148,8 @@ public class RMNodeWrapper implements RMNode { return node.getNodeManagerVersion(); } + @Override + public Set<String> getNodeLabels() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0073c41..9789222 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -138,6 +138,12 @@ Release 2.6.0 - UNRELEASED YARN-2656. Made RM web services authentication filter support proxy user. (Varun Vasudev and Zhijie Shen via zjshen) + YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating + resources based on node-labels. (Wangda Tan via vinodkv) + + YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources + based on node-labels. (Wangda Tan via vinodkv) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6e82af0..e6da24c 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -188,6 +188,23 @@ </Or> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + <Match> + <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue" /> + <Or> + <Field name="absoluteCapacity" /> + <Field name="absoluteMaxCapacity" /> + <Field name="acls" /> + <Field name="capacity" /> + <Field name="maximumCapacity" /> + <Field name="state" /> + <Field name="labelManager" /> + <Field name="defaultLabelExpression" /> + <Field name="accessibleLabels" /> + <Field name="absoluteNodeLabelCapacities" /> + <Field name="reservationsContinueLooking" /> + </Or> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> <!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed --> <Match> <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" /> http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 707cf1b..35baa44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -254,13 +255,13 @@ public class ApplicationMasterService extends AbstractService implements if (hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is already registered : " - + applicationAttemptId.getApplicationId(); + + appID; LOG.warn(message); RMAuditLogger.logFailure( this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), + .get(appID).getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - applicationAttemptId.getApplicationId(), applicationAttemptId); + appID, applicationAttemptId); throw new InvalidApplicationMasterRequestException(message); } @@ -340,6 +341,7 @@ public class ApplicationMasterService extends AbstractService implements ApplicationAttemptId applicationAttemptId = authorizeRequest().getApplicationAttemptId(); + ApplicationId appId = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -351,13 +353,13 @@ public class ApplicationMasterService extends AbstractService implements if (!hasApplicationMasterRegistered(applicationAttemptId)) { String message = "Application Master is trying to unregister before registering for: " - + applicationAttemptId.getApplicationId(); + + appId; LOG.error(message); RMAuditLogger.logFailure( this.rmContext.getRMApps() - .get(applicationAttemptId.getApplicationId()).getUser(), + .get(appId).getUser(), AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", - message, applicationAttemptId.getApplicationId(), + message, appId, applicationAttemptId); throw new ApplicationMasterNotRegisteredException(message); } @@ -365,7 +367,7 @@ public class ApplicationMasterService extends AbstractService implements this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = - rmContext.getRMApps().get(applicationAttemptId.getApplicationId()); + rmContext.getRMApps().get(appId); if (rmApp.isAppFinalStateStored()) { return FinishApplicationMasterResponse.newInstance(true); @@ -418,6 +420,7 @@ public class ApplicationMasterService extends AbstractService implements ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); + ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -432,14 +435,14 @@ public class ApplicationMasterService extends AbstractService implements if (!hasApplicationMasterRegistered(appAttemptId)) { String message = "Application Master is not registered for known application: " - + appAttemptId.getApplicationId() + + applicationId + ". Let AM resync."; LOG.info(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + this.rmContext.getRMApps().get(applicationId) .getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, - appAttemptId.getApplicationId(), + applicationId, appAttemptId); return resync; } @@ -481,11 +484,22 @@ public class ApplicationMasterService extends AbstractService implements List<String> blacklistRemovals = (blacklistRequest != null) ? blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - + RMApp app = + this.rmContext.getRMApps().get(applicationId); + + // set label expression for Resource Requests + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression()) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + // sanity check try { RMServerUtils.validateResourceRequests(ask, - rScheduler.getMaximumResourceCapability()); + rScheduler.getMaximumResourceCapability(), app.getQueue(), + rScheduler); } catch (InvalidResourceRequestException e) { LOG.warn("Invalid resource ask by application " + appAttemptId, e); throw e; @@ -498,8 +512,6 @@ public class ApplicationMasterService extends AbstractService implements throw e; } - RMApp app = - this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); // In the case of work-preserving AM restart, it's possible for the // AM to release containers from the earlier attempt. if (!app.getApplicationSubmissionContext() @@ -582,7 +594,7 @@ public class ApplicationMasterService extends AbstractService implements .toString(), amrmToken.getPassword(), amrmToken.getService() .toString())); LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + appAttemptId.getApplicationId()); + + " to application: " + applicationId); } /* http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 1d672e5..6e1b925 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -343,7 +343,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); - validateResourceRequest(submissionContext); + ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext); // Create RMApp RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf, @@ -351,7 +351,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), - submissionContext.getApplicationTags()); + submissionContext.getApplicationTags(), amReq); // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not @@ -373,7 +373,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, return application; } - private void validateResourceRequest( + private ResourceRequest validateAndCreateResourceRequest( ApplicationSubmissionContext submissionContext) throws InvalidResourceRequestException { // Validation of the ApplicationSubmissionContext needs to be completed @@ -383,18 +383,36 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, // Check whether AM resource requirements are within required limits if (!submissionContext.getUnmanagedAM()) { - ResourceRequest amReq = BuilderUtils.newResourceRequest( - RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, - submissionContext.getResource(), 1); + ResourceRequest amReq; + if (submissionContext.getAMContainerResourceRequest() != null) { + amReq = submissionContext.getAMContainerResourceRequest(); + } else { + amReq = + BuilderUtils.newResourceRequest( + RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY, + submissionContext.getResource(), 1); + } + + // set label expression for AM container + if (null == amReq.getNodeLabelExpression()) { + amReq.setNodeLabelExpression(submissionContext + .getNodeLabelExpression()); + } + try { SchedulerUtils.validateResourceRequest(amReq, - scheduler.getMaximumResourceCapability()); + scheduler.getMaximumResourceCapability(), + submissionContext.getQueue(), scheduler); } catch (InvalidResourceRequestException e) { LOG.warn("RM app submission failed in validating AM resource request" + " for application " + submissionContext.getApplicationId(), e); throw e; } + + return amReq; } + + return null; } private boolean isApplicationInFinalState(RMAppState rmAppState) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index a59965f..e824634 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -108,6 +109,10 @@ public interface RMContext { boolean isWorkPreservingRecoveryEnabled(); + RMNodeLabelsManager getNodeLabelManager(); + + public void setNodeLabelManager(RMNodeLabelsManager mgr); + long getEpoch(); ReservationSystem getReservationSystem(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 78787ee..076c3dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -91,6 +92,7 @@ public class RMContextImpl implements RMContext { private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; private ConfigurationProvider configurationProvider; + private RMNodeLabelsManager nodeLabelManager; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -406,6 +408,16 @@ public class RMContextImpl implements RMContext { this.epoch = epoch; } + @Override + public RMNodeLabelsManager getNodeLabelManager() { + return nodeLabelManager; + } + + @Override + public void setNodeLabelManager(RMNodeLabelsManager mgr) { + nodeLabelManager = mgr; + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { this.schedulerRecoveryStartTime = systemClock.getTime(); this.schedulerRecoveryWaitTime = waitTime; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 29c5953..40d86e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -84,9 +85,11 @@ public class RMServerUtils { * requested memory/vcore is non-negative and not greater than max */ public static void validateResourceRequests(List<ResourceRequest> ask, - Resource maximumResource) throws InvalidResourceRequestException { + Resource maximumResource, String queueName, YarnScheduler scheduler) + throws InvalidResourceRequestException { for (ResourceRequest resReq : ask) { - SchedulerUtils.validateResourceRequest(resReq, maximumResource); + SchedulerUtils.validateResourceRequest(resReq, maximumResource, + queueName, scheduler); } } @@ -132,17 +135,25 @@ public class RMServerUtils { } } + public static UserGroupInformation verifyAccess( + AccessControlList acl, String method, final Log LOG) + throws IOException { + // by default, this method will use AdminService as module name + return verifyAccess(acl, method, "AdminService", LOG); + } + /** * Utility method to verify if the current user has access based on the * passed {@link AccessControlList} * @param acl the {@link AccessControlList} to check against * @param method the method name to be logged + * @param module, like AdminService or NodeLabelManager * @param LOG the logger to use * @return {@link UserGroupInformation} of the current user * @throws IOException */ public static UserGroupInformation verifyAccess( - AccessControlList acl, String method, final Log LOG) + AccessControlList acl, String method, String module, final Log LOG) throws IOException { UserGroupInformation user; try { @@ -159,7 +170,7 @@ public class RMServerUtils { " to call '" + method + "'"); RMAuditLogger.logFailure(user.getShortUserName(), method, - acl.toString(), "AdminService", + acl.toString(), module, RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); throw new AccessControlException("User " + user.getShortUserName() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ab45020..68cbc7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -320,6 +321,10 @@ public class ResourceManager extends CompositeService implements Recoverable { return new AMLivelinessMonitor(this.rmDispatcher); } + protected RMNodeLabelsManager createNodeLabelManager() { + return new RMNodeLabelsManager(); + } + protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); } @@ -399,6 +404,10 @@ public class ResourceManager extends CompositeService implements Recoverable { AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); rmContext.setAMFinishingMonitor(amFinishingMonitor); + + RMNodeLabelsManager nlm = createNodeLabelManager(); + addService(nlm); + rmContext.setNodeLabelManager(nlm); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, @@ -962,7 +971,7 @@ public class ResourceManager extends CompositeService implements Recoverable { * instance of {@link RMActiveServices} and initializes it. * @throws Exception */ - void createAndInitActiveServices() throws Exception { + protected void createAndInitActiveServices() throws Exception { activeServices = new RMActiveServices(); activeServices.init(conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 0c0fbc0..126560a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -126,14 +127,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower { // create the default reservation queue if it doesnt exist String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; if (scheduler.getQueue(defReservationQueue) == null) { - ReservationQueue defQueue = - new ReservationQueue(scheduler, defReservationQueue, planQueue); try { + ReservationQueue defQueue = + new ReservationQueue(scheduler, defReservationQueue, planQueue); scheduler.addQueue(defQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( "Exception while trying to create default reservation queue for plan: {}", planQueueName, e); + } catch (IOException e) { + LOG.warn( + "Exception while trying to create default reservation queue for plan: {}", + planQueueName, e); } } curReservationNames.add(defReservationQueue); @@ -186,14 +191,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower { for (ReservationAllocation res : sortedAllocations) { String currResId = res.getReservationId().toString(); if (curReservationNames.contains(currResId)) { - ReservationQueue resQueue = - new ReservationQueue(scheduler, currResId, planQueue); try { + ReservationQueue resQueue = + new ReservationQueue(scheduler, currResId, planQueue); scheduler.addQueue(resQueue); } catch (SchedulerDynamicEditException e) { LOG.warn( "Exception while trying to activate reservation: {} for plan: {}", currResId, planQueueName, e); + } catch (IOException e) { + LOG.warn( + "Exception while trying to activate reservation: {} for plan: {}", + currResId, planQueueName, e); } } Resource capToAssign = res.getResourcesAtTime(now); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index c0681aa..1994b36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -143,6 +144,7 @@ public class RMAppImpl implements RMApp, Recoverable { private RMAppEvent eventCausingFinalSaving; private RMAppState targetedFinalState; private RMAppState recoveredFinalState; + private ResourceRequest amReq; Object transitionTodo; @@ -342,7 +344,8 @@ public class RMAppImpl implements RMApp, Recoverable { Configuration config, String name, String user, String queue, ApplicationSubmissionContext submissionContext, YarnScheduler scheduler, ApplicationMasterService masterService, long submitTime, - String applicationType, Set<String> applicationTags) { + String applicationType, Set<String> applicationTags, + ResourceRequest amReq) { this.systemClock = new SystemClock(); @@ -361,6 +364,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.startTime = this.systemClock.getTime(); this.applicationType = applicationType; this.applicationTags = applicationTags; + this.amReq = amReq; int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -732,7 +736,7 @@ public class RMAppImpl implements RMApp, Recoverable { // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1)); + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index fbcb7d7..b5a6237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -177,6 +176,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private Object transitionTodo; private RMAppAttemptMetrics attemptMetrics = null; + private ResourceRequest amReq = null; private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, @@ -426,7 +426,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMContext rmContext, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationSubmissionContext submissionContext, - Configuration conf, boolean maybeLastAttempt) { + Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) { this.conf = conf; this.applicationAttemptId = appAttemptId; this.rmContext = rmContext; @@ -442,8 +442,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.proxiedTrackingUrl = generateProxyUriWithScheme(null); this.maybeLastAttempt = maybeLastAttempt; this.stateMachine = stateMachineFactory.make(this); + this.attemptMetrics = new RMAppAttemptMetrics(applicationAttemptId, rmContext); + + this.amReq = amReq; } @Override @@ -885,24 +888,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST = new ArrayList<ResourceRequest>(); - private static final class ScheduleTransition + @VisibleForTesting + public static final class ScheduleTransition implements MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - if (!appAttempt.submissionContext.getUnmanagedAM()) { - // Request a container for the AM. - ResourceRequest request = - BuilderUtils.newResourceRequest( - AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt - .getSubmissionContext().getResource(), 1); - + ApplicationSubmissionContext subCtx = appAttempt.submissionContext; + if (!subCtx.getUnmanagedAM()) { + // Need reset #containers before create new attempt, because this request + // will be passed to scheduler, and scheduler will deduct the number after + // AM container allocated + + // Currently, following fields are all hard code, + // TODO: change these fields when we want to support + // priority/resource-name/relax-locality specification for AM containers + // allocation. + appAttempt.amReq.setNumContainers(1); + appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); + appAttempt.amReq.setResourceName(ResourceRequest.ANY); + appAttempt.amReq.setRelaxLocality(true); + // SchedulerUtils.validateResourceRequests is not necessary because // AM resource has been checked when submission - Allocation amContainerAllocation = appAttempt.scheduler.allocate( - appAttempt.applicationAttemptId, - Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null); + Allocation amContainerAllocation = + appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, + Collections.singletonList(appAttempt.amReq), + EMPTY_CONTAINER_RELEASE_LIST, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index a423ea5..afbcbc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; +import java.util.Set; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -135,4 +136,11 @@ public interface RMNode { * @return containerUpdates accumulated across NM heartbeats. */ public List<UpdatedContainerInfo> pullContainerUpdates(); + + /** + * Get set of labels in this node + * + * @return labels in this node + */ + public Set<String> getNodeLabels(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c960b50..13d60ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -855,4 +855,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { public Set<ContainerId> getLaunchedContainers() { return this.launchedContainers; } + + @Override + public Set<String> getNodeLabels() { + if (context.getNodeLabelManager() == null) { + return null; + } + return context.getNodeLabelManager().getLabelsOnNode(nodeId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index 0bc8ca1..4663a91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.List; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Evolving; @@ -71,4 +72,22 @@ public interface Queue { */ public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); + + /** + * Get labels can be accessed of this queue + * labels={*}, means this queue can access any label + * labels={ }, means this queue cannot access any label except node without label + * labels={a, b, c} means this queue can access a or b or c + * @return labels + */ + public Set<String> getAccessibleNodeLabels(); + + /** + * Get default label expression of this queue. If label expression of + * ApplicationSubmissionContext and label expression of Resource Request not + * set, this will be used. + * + * @return default label expression + */ + public String getDefaultNodeLabelExpression(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index ac37c2f..5d00009 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -17,23 +17,29 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.List; +import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.Sets; + /** * Utilities shared by schedulers. */ @@ -190,7 +196,8 @@ public class SchedulerUtils { * request */ public static void validateResourceRequest(ResourceRequest resReq, - Resource maximumResource) throws InvalidResourceRequestException { + Resource maximumResource, String queueName, YarnScheduler scheduler) + throws InvalidResourceRequestException { if (resReq.getCapability().getMemory() < 0 || resReq.getCapability().getMemory() > maximumResource.getMemory()) { throw new InvalidResourceRequestException("Invalid resource request" @@ -209,5 +216,116 @@ public class SchedulerUtils { + resReq.getCapability().getVirtualCores() + ", maxVirtualCores=" + maximumResource.getVirtualCores()); } + + // Get queue from scheduler + QueueInfo queueInfo = null; + try { + queueInfo = scheduler.getQueueInfo(queueName, false, false); + } catch (IOException e) { + // it is possible queue cannot get when queue mapping is set, just ignore + // the queueInfo here, and move forward + } + + // check labels in the resource request. + String labelExp = resReq.getNodeLabelExpression(); + + // if queue has default label expression, and RR doesn't have, use the + // default label expression of queue + if (labelExp == null && queueInfo != null) { + labelExp = queueInfo.getDefaultNodeLabelExpression(); + resReq.setNodeLabelExpression(labelExp); + } + + if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { + if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), + labelExp)) { + throw new InvalidResourceRequestException("Invalid resource request" + + ", queue=" + + queueInfo.getQueueName() + + " doesn't have permission to access all labels " + + "in resource request. labelExpression of resource request=" + + labelExp + + ". Queue labels=" + + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo + .getAccessibleNodeLabels().iterator(), ','))); + } + } + } + + public static boolean checkQueueAccessToNode(Set<String> queueLabels, + Set<String> nodeLabels) { + // if queue's label is *, it can access any node + if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) { + return true; + } + // any queue can access to a node without label + if (nodeLabels == null || nodeLabels.isEmpty()) { + return true; + } + // a queue can access to a node only if it contains any label of the node + if (queueLabels != null + && Sets.intersection(queueLabels, nodeLabels).size() > 0) { + return true; + } + // sorry, you cannot access + return false; + } + + public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr, + Set<String> labels) throws IOException { + if (mgr == null) { + if (labels != null && !labels.isEmpty()) { + throw new IOException("NodeLabelManager is null, please check"); + } + return; + } + + if (labels != null) { + for (String label : labels) { + if (!label.equals(RMNodeLabelsManager.ANY) + && !mgr.containsNodeLabel(label)) { + throw new IOException("NodeLabelManager doesn't include label = " + + label + ", please check."); + } + } + } + } + + public static boolean checkNodeLabelExpression(Set<String> nodeLabels, + String labelExpression) { + // empty label expression can only allocate on node with empty labels + if (labelExpression == null || labelExpression.trim().isEmpty()) { + if (!nodeLabels.isEmpty()) { + return false; + } + } + + if (labelExpression != null) { + for (String str : labelExpression.split("&&")) { + if (!str.trim().isEmpty() + && (nodeLabels == null || !nodeLabels.contains(str.trim()))) { + return false; + } + } + } + return true; + } + + public static boolean checkQueueLabelExpression(Set<String> queueLabels, + String labelExpression) { + if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) { + return true; + } + // if label expression is empty, we can allocate container on any node + if (labelExpression == null) { + return true; + } + for (String str : labelExpression.split("&&")) { + if (!str.trim().isEmpty() + && (queueLabels == null || !queueLabels.contains(str.trim()))) { + return false; + } + } + return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java new file mode 100644 index 0000000..7159e4d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.collect.Sets; + +public abstract class AbstractCSQueue implements CSQueue { + + CSQueue parent; + final String queueName; + + float capacity; + float maximumCapacity; + float absoluteCapacity; + float absoluteMaxCapacity; + float absoluteUsedCapacity = 0.0f; + + float usedCapacity = 0.0f; + volatile int numContainers; + + final Resource minimumAllocation; + final Resource maximumAllocation; + QueueState state; + final QueueMetrics metrics; + + final ResourceCalculator resourceCalculator; + Set<String> accessibleLabels; + RMNodeLabelsManager labelManager; + String defaultLabelExpression; + Resource usedResources = Resources.createResource(0, 0); + QueueInfo queueInfo; + Map<String, Float> absoluteCapacityByNodeLabels; + Map<String, Float> capacitiyByNodeLabels; + Map<String, Resource> usedResourcesByNodeLabels = new HashMap<String, Resource>(); + Map<String, Float> absoluteMaxCapacityByNodeLabels; + Map<String, Float> maxCapacityByNodeLabels; + + Map<QueueACL, AccessControlList> acls = + new HashMap<QueueACL, AccessControlList>(); + boolean reservationsContinueLooking; + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public AbstractCSQueue(CapacitySchedulerContext cs, + String queueName, CSQueue parent, CSQueue old) throws IOException { + this.minimumAllocation = cs.getMinimumResourceCapability(); + this.maximumAllocation = cs.getMaximumResourceCapability(); + this.labelManager = cs.getRMContext().getNodeLabelManager(); + this.parent = parent; + this.queueName = queueName; + this.resourceCalculator = cs.getResourceCalculator(); + this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class); + + // must be called after parent and queueName is set + this.metrics = old != null ? old.getMetrics() : + QueueMetrics.forQueue(getQueuePath(), parent, + cs.getConfiguration().getEnableUserMetrics(), + cs.getConf()); + + // get labels + this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = cs.getConfiguration() + .getDefaultNodeLabelExpression(getQueuePath()); + + this.queueInfo.setQueueName(queueName); + + // inherit from parent if labels not set + if (this.accessibleLabels == null && parent != null) { + this.accessibleLabels = parent.getAccessibleNodeLabels(); + SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager, + this.accessibleLabels); + } + + // inherit from parent if labels not set + if (this.defaultLabelExpression == null && parent != null + && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } + + // set capacity by labels + capacitiyByNodeLabels = + cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels, + labelManager); + + // set maximum capacity by labels + maxCapacityByNodeLabels = + cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(), + accessibleLabels, labelManager); + } + + @Override + public synchronized float getCapacity() { + return capacity; + } + + @Override + public synchronized float getAbsoluteCapacity() { + return absoluteCapacity; + } + + @Override + public float getAbsoluteMaximumCapacity() { + return absoluteMaxCapacity; + } + + @Override + public synchronized float getAbsoluteUsedCapacity() { + return absoluteUsedCapacity; + } + + @Override + public float getMaximumCapacity() { + return maximumCapacity; + } + + @Override + public synchronized float getUsedCapacity() { + return usedCapacity; + } + + @Override + public synchronized Resource getUsedResources() { + return usedResources; + } + + public synchronized int getNumContainers() { + return numContainers; + } + + @Override + public synchronized QueueState getState() { + return state; + } + + @Override + public QueueMetrics getMetrics() { + return metrics; + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public synchronized CSQueue getParent() { + return parent; + } + + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + + public Set<String> getAccessibleNodeLabels() { + return accessibleLabels; + } + + @Override + public boolean hasAccess(QueueACL acl, UserGroupInformation user) { + synchronized (this) { + if (acls.get(acl).isUserAllowed(user)) { + return true; + } + } + + if (parent != null) { + return parent.hasAccess(acl, user); + } + + return false; + } + + @Override + public synchronized void setUsedCapacity(float usedCapacity) { + this.usedCapacity = usedCapacity; + } + + @Override + public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { + this.absoluteUsedCapacity = absUsedCapacity; + } + + /** + * Set maximum capacity - used only for testing. + * @param maximumCapacity new max capacity + */ + synchronized void setMaxCapacity(float maximumCapacity) { + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + float absMaxCapacity = + CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + absMaxCapacity); + + this.maximumCapacity = maximumCapacity; + this.absoluteMaxCapacity = absMaxCapacity; + } + + @Override + public float getAbsActualCapacity() { + // for now, simply return actual capacity = guaranteed capacity for parent + // queue + return absoluteCapacity; + } + + @Override + public String getDefaultNodeLabelExpression() { + return defaultLabelExpression; + } + + synchronized void setupQueueConfigs(Resource clusterResource, float capacity, + float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity, + QueueState state, Map<QueueACL, AccessControlList> acls, + Set<String> labels, String defaultLabelExpression, + Map<String, Float> nodeLabelCapacities, + Map<String, Float> maximumNodeLabelCapacities, + boolean reservationContinueLooking) + throws IOException { + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, + absoluteMaxCapacity); + + this.capacity = capacity; + this.absoluteCapacity = absoluteCapacity; + + this.maximumCapacity = maximumCapacity; + this.absoluteMaxCapacity = absoluteMaxCapacity; + + this.state = state; + + this.acls = acls; + + // set labels + this.accessibleLabels = labels; + + // set label expression + this.defaultLabelExpression = defaultLabelExpression; + + // copy node label capacity + this.capacitiyByNodeLabels = new HashMap<String, Float>(nodeLabelCapacities); + this.maxCapacityByNodeLabels = + new HashMap<String, Float>(maximumNodeLabelCapacities); + + this.queueInfo.setAccessibleNodeLabels(this.accessibleLabels); + this.queueInfo.setCapacity(this.capacity); + this.queueInfo.setMaximumCapacity(this.maximumCapacity); + this.queueInfo.setQueueState(this.state); + this.queueInfo.setDefaultNodeLabelExpression(this.defaultLabelExpression); + + // Update metrics + CSQueueUtils.updateQueueStatistics( + resourceCalculator, this, parent, clusterResource, minimumAllocation); + + // Check if labels of this queue is a subset of parent queue, only do this + // when we not root + if (parent != null && parent.getParent() != null) { + if (parent.getAccessibleNodeLabels() != null + && !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + // if parent isn't "*", child shouldn't be "*" too + if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + throw new IOException("Parent's accessible queue is not ANY(*), " + + "but child's accessible queue is *"); + } else { + Set<String> diff = + Sets.difference(this.getAccessibleNodeLabels(), + parent.getAccessibleNodeLabels()); + if (!diff.isEmpty()) { + throw new IOException("Some labels of child queue is not a subset " + + "of parent queue, these labels=[" + + StringUtils.join(diff, ",") + "]"); + } + } + } + } + + // calculate absolute capacity by each node label + this.absoluteCapacityByNodeLabels = + CSQueueUtils.computeAbsoluteCapacityByNodeLabels( + this.capacitiyByNodeLabels, parent); + + // calculate maximum capacity by each node label + this.absoluteMaxCapacityByNodeLabels = + CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels( + maximumNodeLabelCapacities, parent); + + // check absoluteMaximumNodeLabelCapacities is valid + CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(), + absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels); + + this.reservationsContinueLooking = reservationContinueLooking; + } + + @Private + public Resource getMaximumAllocation() { + return maximumAllocation; + } + + @Private + public Resource getMinimumAllocation() { + return minimumAllocation; + } + + synchronized void allocateResource(Resource clusterResource, + Resource resource, Set<String> nodeLabels) { + Resources.addTo(usedResources, resource); + + // Update usedResources by labels + if (nodeLabels == null || nodeLabels.isEmpty()) { + if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { + usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), + resource); + } else { + for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { + if (!usedResourcesByNodeLabels.containsKey(label)) { + usedResourcesByNodeLabels.put(label, Resources.createResource(0)); + } + Resources.addTo(usedResourcesByNodeLabels.get(label), resource); + } + } + + ++numContainers; + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), + clusterResource, minimumAllocation); + } + + protected synchronized void releaseResource(Resource clusterResource, + Resource resource, Set<String> nodeLabels) { + // Update queue metrics + Resources.subtractFrom(usedResources, resource); + + // Update usedResources by labels + if (null == nodeLabels || nodeLabels.isEmpty()) { + if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) { + usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(0)); + } + Resources.subtractFrom( + usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource); + } else { + for (String label : Sets.intersection(accessibleLabels, nodeLabels)) { + if (!usedResourcesByNodeLabels.containsKey(label)) { + usedResourcesByNodeLabels.put(label, Resources.createResource(0)); + } + Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource); + } + } + + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), + clusterResource, minimumAllocation); + --numContainers; + } + + @Private + public float getCapacityByNodeLabel(String label) { + if (null == parent) { + return 1f; + } + + if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { + return getCapacity(); + } + + if (!capacitiyByNodeLabels.containsKey(label)) { + return 0; + } else { + return capacitiyByNodeLabels.get(label); + } + } + + @Private + public float getAbsoluteCapacityByNodeLabel(String label) { + if (null == parent) { + return 1; + } + + if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { + return getAbsoluteCapacity(); + } + + if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) { + return 0; + } else { + return absoluteMaxCapacityByNodeLabels.get(label); + } + } + + @Private + public float getAbsoluteMaximumCapacityByNodeLabel(String label) { + if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) { + return getAbsoluteMaximumCapacity(); + } + + return getAbsoluteCapacityByNodeLabel(label); + } + + @Private + public boolean getReservationContinueLooking() { + return reservationsContinueLooking; + } + + @Private + public Map<QueueACL, AccessControlList> getACLs() { + return acls; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index db893dc..6438d6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -72,9 +72,18 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { /** * Get the configured <em>capacity</em> of the queue. - * @return queue capacity + * @return configured queue capacity */ public float getCapacity(); + + /** + * Get actual <em>capacity</em> of the queue, this may be different from + * configured capacity when mis-config take place, like add labels to the + * cluster + * + * @return actual queue capacity + */ + public float getAbsActualCapacity(); /** * Get capacity of the parent of the queue as a function of the @@ -106,28 +115,31 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { public float getAbsoluteUsedCapacity(); /** - * Get the current used capacity of the queue - * and it's children (if any). - * @return queue used capacity - */ - public float getUsedCapacity(); - - /** * Set used capacity of the queue. - * @param usedCapacity used capacity of the queue + * @param usedCapacity + * used capacity of the queue */ public void setUsedCapacity(float usedCapacity); - + /** * Set absolute used capacity of the queue. - * @param absUsedCapacity absolute used capacity of the queue + * @param absUsedCapacity + * absolute used capacity of the queue */ public void setAbsoluteUsedCapacity(float absUsedCapacity); /** - * Get the currently utilized resources in the cluster - * by the queue and children (if any). - * @return used resources by the queue and it's children + * Get the current used capacity of nodes without label(s) of the queue + * and it's children (if any). + * @return queue used capacity + */ + public float getUsedCapacity(); + + /** + * Get the currently utilized resources which allocated at nodes without any + * labels in the cluster by the queue and children (if any). + * + * @return used resources by the queue and it's children */ public Resource getUsedResources(); @@ -259,4 +271,25 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { */ public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer container); + + /** + * Get absolute capacity by label of this queue can use + * @param nodeLabel + * @return absolute capacity by label of this queue can use + */ + public float getAbsoluteCapacityByNodeLabel(String nodeLabel); + + /** + * Get absolute max capacity by label of this queue can use + * @param nodeLabel + * @return absolute capacity by label of this queue can use + */ + public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel); + + /** + * Get capacity by node label + * @param nodeLabel + * @return capacity by node label + */ + public float getCapacityByNodeLabel(String nodeLabel); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 737062b..0a2fa3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -40,7 +43,7 @@ class CSQueueUtils { } } - public static void checkAbsoluteCapacities(String queueName, + public static void checkAbsoluteCapacity(String queueName, float absCapacity, float absMaxCapacity) { if (absMaxCapacity < (absCapacity - EPSILON)) { throw new IllegalArgumentException("Illegal call to setMaxCapacity. " @@ -49,6 +52,23 @@ class CSQueueUtils { + ")"); } } + + public static void checkAbsoluteCapacitiesByLabel(String queueName, + Map<String, Float> absCapacities, + Map<String, Float> absMaximumCapacities) { + for (Entry<String, Float> entry : absCapacities.entrySet()) { + String label = entry.getKey(); + float absCapacity = entry.getValue(); + float absMaxCapacity = absMaximumCapacities.get(label); + if (absMaxCapacity < (absCapacity - EPSILON)) { + throw new IllegalArgumentException("Illegal call to setMaxCapacity. " + + "Queue '" + queueName + "' has " + "an absolute capacity (" + + absCapacity + ") greater than " + + "its absolute maximumCapacity (" + absMaxCapacity + ") of label=" + + label); + } + } + } public static float computeAbsoluteMaximumCapacity( float maximumCapacity, CSQueue parent) { @@ -56,6 +76,39 @@ class CSQueueUtils { (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity(); return (parentAbsMaxCapacity * maximumCapacity); } + + public static Map<String, Float> computeAbsoluteCapacityByNodeLabels( + Map<String, Float> nodeLabelToCapacities, CSQueue parent) { + if (parent == null) { + return nodeLabelToCapacities; + } + + Map<String, Float> absoluteCapacityByNodeLabels = + new HashMap<String, Float>(); + for (Entry<String, Float> entry : nodeLabelToCapacities.entrySet()) { + String label = entry.getKey(); + float capacity = entry.getValue(); + absoluteCapacityByNodeLabels.put(label, + capacity * parent.getAbsoluteCapacityByNodeLabel(label)); + } + return absoluteCapacityByNodeLabels; + } + + public static Map<String, Float> computeAbsoluteMaxCapacityByNodeLabels( + Map<String, Float> maximumNodeLabelToCapacities, CSQueue parent) { + if (parent == null) { + return maximumNodeLabelToCapacities; + } + Map<String, Float> absoluteMaxCapacityByNodeLabels = + new HashMap<String, Float>(); + for (Entry<String, Float> entry : maximumNodeLabelToCapacities.entrySet()) { + String label = entry.getKey(); + float maxCapacity = entry.getValue(); + absoluteMaxCapacityByNodeLabels.put(label, + maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label)); + } + return absoluteMaxCapacityByNodeLabels; + } public static int computeMaxActiveApplications( ResourceCalculator calculator, http://git-wip-us.apache.org/repos/asf/hadoop/blob/e8e3a362/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ed5518c..9332228 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -20,7 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,8 +61,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -191,6 +204,7 @@ public class CapacityScheduler extends private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; + private RMNodeLabelsManager labelManager; /** * EXPERT @@ -275,6 +289,8 @@ public class CapacityScheduler extends this.applications = new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>(); + this.labelManager = rmContext.getNodeLabelManager(); + initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -446,7 +462,7 @@ public class CapacityScheduler extends root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); - + labelManager.reinitializeQueueLabels(getQueueToLabels()); LOG.info("Initialized root queue " + root); initializeQueueMappings(); } @@ -469,10 +485,19 @@ public class CapacityScheduler extends // Re-configure queues root.reinitialize(newRoot, clusterResource); initializeQueueMappings(); - + // Re-calculate headroom for active applications root.updateClusterResource(clusterResource); + labelManager.reinitializeQueueLabels(getQueueToLabels()); + } + + private Map<String, Set<String>> getQueueToLabels() { + Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>(); + for (CSQueue queue : queues.values()) { + queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); + } + return queueToLabels; } /** @@ -515,7 +540,7 @@ public class CapacityScheduler extends @Lock(CapacityScheduler.class) static CSQueue parseQueue( - CapacitySchedulerContext csContext, + CapacitySchedulerContext csContext, CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, Map<String, CSQueue> queues, Map<String, CSQueue> oldQueues, @@ -1094,11 +1119,18 @@ public class CapacityScheduler extends } private synchronized void addNode(RMNode nodeManager) { + // update this node to node label manager + if (labelManager != null) { + labelManager.activateNode(nodeManager.getNodeID(), + nodeManager.getTotalCapability()); + } + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager, usePortForNodeName)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); root.updateClusterResource(clusterResource); int numNodes = numNodeManagers.incrementAndGet(); + LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); @@ -1108,6 +1140,11 @@ public class CapacityScheduler extends } private synchronized void removeNode(RMNode nodeInfo) { + // update this node to node label manager + if (labelManager != null) { + labelManager.deactivateNode(nodeInfo.getNodeID()); + } + FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID()); if (node == null) { return; @@ -1141,6 +1178,7 @@ public class CapacityScheduler extends } this.nodes.remove(nodeInfo.getNodeID()); + LOG.info("Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + clusterResource); }