YARN-8468. Enable the use of queue based maximum container allocation limit and implement it in FairScheduler. Contributed by Antal Bálint Steinbach.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd6be589 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd6be589 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd6be589 Branch: refs/heads/trunk Commit: fd6be5898ad1a650e3bceacb8169a53520da57e5 Parents: 0da03f8 Author: Weiwei Yang <w...@apache.org> Authored: Sat Sep 29 17:47:12 2018 +0800 Committer: Weiwei Yang <w...@apache.org> Committed: Sat Sep 29 17:47:12 2018 +0800 ---------------------------------------------------------------------- .../server/resourcemanager/RMAppManager.java | 11 +- .../server/resourcemanager/RMServerUtils.java | 9 +- .../scheduler/AbstractYarnScheduler.java | 21 ++- .../scheduler/SchedulerUtils.java | 62 +++---- .../scheduler/YarnScheduler.java | 9 +- .../scheduler/capacity/CapacityScheduler.java | 7 +- .../processor/PlacementConstraintProcessor.java | 12 +- .../scheduler/fair/AllocationConfiguration.java | 13 ++ .../scheduler/fair/FSLeafQueue.java | 10 ++ .../scheduler/fair/FSParentQueue.java | 15 +- .../resourcemanager/scheduler/fair/FSQueue.java | 7 + .../scheduler/fair/FairScheduler.java | 52 ++++-- .../allocation/AllocationFileQueueParser.java | 7 + .../fair/allocation/QueueProperties.java | 16 +- .../webapp/FairSchedulerPage.java | 4 + .../webapp/dao/FairSchedulerQueueInfo.java | 9 +- .../resourcemanager/AppManagerTestBase.java | 107 +++++++++++ .../yarn/server/resourcemanager/MockRM.java | 8 + .../server/resourcemanager/TestAppManager.java | 89 ++-------- .../TestAppManagerWithFairScheduler.java | 177 +++++++++++++++++++ .../resourcemanager/TestClientRMService.java | 104 +++++------ .../resourcemanager/TestRMServerUtils.java | 98 +++++++++- .../scheduler/TestSchedulerUtils.java | 144 ++++++++------- .../capacity/TestCapacityScheduler.java | 39 +++- .../scheduler/fair/FairSchedulerTestBase.java | 4 +- .../fair/TestAllocationFileLoaderService.java | 37 +++- .../TestApplicationMasterServiceWithFS.java | 174 ++++++++++++++++++ .../scheduler/fair/TestFairScheduler.java | 140 +++++++++++++-- .../allocationfile/AllocationFileQueue.java | 6 +- .../AllocationFileQueueBuilder.java | 6 + .../AllocationFileQueueProperties.java | 12 ++ .../src/site/markdown/FairScheduler.md | 2 + 32 files changed, 1115 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d0f2ce6..783fab0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -567,13 +568,13 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>, // Normalize all requests String queue = submissionContext.getQueue(); + Resource maxAllocation = scheduler.getMaximumResourceCapability(queue); for (ResourceRequest amReq : amReqs) { - SchedulerUtils.normalizeAndValidateRequest(amReq, - scheduler.getMaximumResourceCapability(queue), - queue, scheduler, isRecovery, rmContext); + SchedulerUtils.normalizeAndValidateRequest(amReq, maxAllocation, + queue, scheduler, isRecovery, rmContext, null); - amReq.setCapability( - scheduler.getNormalizedResource(amReq.getCapability())); + amReq.setCapability(scheduler.getNormalizedResource( + amReq.getCapability(), maxAllocation)); } return amReqs; } catch (InvalidResourceRequestException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index ab6bbcf..b18b12e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -97,7 +97,7 @@ public class RMServerUtils { "INCORRECT_CONTAINER_VERSION_ERROR"; private static final String INVALID_CONTAINER_ID = "INVALID_CONTAINER_ID"; - private static final String RESOURCE_OUTSIDE_ALLOWED_RANGE = + public static final String RESOURCE_OUTSIDE_ALLOWED_RANGE = "RESOURCE_OUTSIDE_ALLOWED_RANGE"; protected static final RecordFactory RECORD_FACTORY = @@ -235,7 +235,7 @@ public class RMServerUtils { * requested memory/vcore is non-negative and not greater than max */ public static void normalizeAndValidateRequests(List<ResourceRequest> ask, - Resource maximumResource, String queueName, YarnScheduler scheduler, + Resource maximumAllocation, String queueName, YarnScheduler scheduler, RMContext rmContext) throws InvalidResourceRequestException { // Get queue from scheduler QueueInfo queueInfo = null; @@ -247,7 +247,7 @@ public class RMServerUtils { } for (ResourceRequest resReq : ask) { - SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource, + SchedulerUtils.normalizeAndValidateRequest(resReq, maximumAllocation, queueName, scheduler, rmContext, queueInfo); } } @@ -338,7 +338,8 @@ public class RMServerUtils { return false; } ResourceScheduler scheduler = rmContext.getScheduler(); - request.setCapability(scheduler.getNormalizedResource(request.getCapability())); + request.setCapability(scheduler + .getNormalizedResource(request.getCapability(), maximumAllocation)); return true; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 9d2b058..0acfca7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -1159,11 +1159,12 @@ public abstract class AbstractYarnScheduler } @Override - public Resource getNormalizedResource(Resource requestedResource) { + public Resource getNormalizedResource(Resource requestedResource, + Resource maxResourceCapability) { return SchedulerUtils.getNormalizedResource(requestedResource, getResourceCalculator(), getMinimumResourceCapability(), - getMaximumResourceCapability(), + maxResourceCapability, getMinimumResourceCapability()); } @@ -1173,8 +1174,20 @@ public abstract class AbstractYarnScheduler * @param asks resource requests */ protected void normalizeResourceRequests(List<ResourceRequest> asks) { - for (ResourceRequest ask: asks) { - ask.setCapability(getNormalizedResource(ask.getCapability())); + normalizeResourceRequests(asks, null); + } + + /** + * Normalize a list of resource requests + * using queue maximum resource allocations. + * @param asks resource requests + */ + protected void normalizeResourceRequests(List<ResourceRequest> asks, + String queueName) { + Resource maxAllocation = getMaximumResourceCapability(queueName); + for (ResourceRequest ask : asks) { + ask.setCapability( + getNormalizedResource(ask.getCapability(), maxAllocation)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 9b07d37..9a02b6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -114,19 +114,19 @@ public class SchedulerUtils { public static final String UPDATED_CONTAINER = "Temporary container killed by application for ExeutionType update"; - - public static final String LOST_CONTAINER = + + public static final String LOST_CONTAINER = "Container released on a *lost* node"; - - public static final String PREEMPTED_CONTAINER = + + public static final String PREEMPTED_CONTAINER = "Container preempted by scheduler"; - - public static final String COMPLETED_APPLICATION = + + public static final String COMPLETED_APPLICATION = "Container of a completed application"; - + public static final String EXPIRED_CONTAINER = "Container expired since it was unused"; - + public static final String UNRESERVED_CONTAINER = "Container reservation no longer required."; @@ -141,7 +141,7 @@ public class SchedulerUtils { */ public static ContainerStatus createAbnormalContainerStatus( ContainerId containerId, String diagnostics) { - return createAbnormalContainerStatus(containerId, + return createAbnormalContainerStatus(containerId, ContainerExitStatus.ABORTED, diagnostics); } @@ -169,14 +169,14 @@ public class SchedulerUtils { */ public static ContainerStatus createPreemptedContainerStatus( ContainerId containerId, String diagnostics) { - return createAbnormalContainerStatus(containerId, + return createAbnormalContainerStatus(containerId, ContainerExitStatus.PREEMPTED, diagnostics); } /** * Utility to create a {@link ContainerStatus} during exceptional * circumstances. - * + * * @param containerId {@link ContainerId} of returned/released/lost container. * @param diagnostics diagnostic message * @return <code>ContainerStatus</code> for an returned/released/lost @@ -184,7 +184,7 @@ public class SchedulerUtils { */ private static ContainerStatus createAbnormalContainerStatus( ContainerId containerId, int exitStatus, String diagnostics) { - ContainerStatus containerStatus = + ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class); containerStatus.setContainerId(containerId); containerStatus.setDiagnostics(diagnostics); @@ -254,23 +254,14 @@ public class SchedulerUtils { labelExp = RMNodeLabelsManager.NO_LABEL; } - if ( labelExp != null) { + if (labelExp != null) { resReq.setNodeLabelExpression(labelExp); } } public static void normalizeAndValidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler, - boolean isRecovery, RMContext rmContext) - throws InvalidResourceRequestException { - normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, - isRecovery, rmContext, null); - } - - - private static void normalizeAndValidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler, - boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) + Resource maximumAllocation, String queueName, YarnScheduler scheduler, + boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) throws InvalidResourceRequestException { Configuration conf = rmContext.getYarnConfiguration(); // If Node label is not enabled throw exception @@ -299,37 +290,30 @@ public class SchedulerUtils { SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); if (!isRecovery) { - validateResourceRequest(resReq, maximumResource, queueInfo, rmContext); + validateResourceRequest(resReq, maximumAllocation, queueInfo, rmContext); } } - public static void normalizeAndvalidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler, - RMContext rmContext) throws InvalidResourceRequestException { - normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler, - rmContext, null); - } - - public static void normalizeAndvalidateRequest(ResourceRequest resReq, - Resource maximumResource, String queueName, YarnScheduler scheduler, + public static void normalizeAndValidateRequest(ResourceRequest resReq, + Resource maximumAllocation, String queueName, YarnScheduler scheduler, RMContext rmContext, QueueInfo queueInfo) throws InvalidResourceRequestException { - normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler, + normalizeAndValidateRequest(resReq, maximumAllocation, queueName, scheduler, false, rmContext, queueInfo); } /** * Utility method to validate a resource request, by insuring that the * requested memory/vcore is non-negative and not greater than max - * + * * @throws InvalidResourceRequestException when there is invalid request */ private static void validateResourceRequest(ResourceRequest resReq, - Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) + Resource maximumAllocation, QueueInfo queueInfo, RMContext rmContext) throws InvalidResourceRequestException { final Resource requestedResource = resReq.getCapability(); checkResourceRequestAgainstAvailableResource(requestedResource, - maximumResource); + maximumAllocation); String labelExp = resReq.getNodeLabelExpression(); // we don't allow specify label expression other than resourceName=ANY now @@ -535,7 +519,7 @@ public class SchedulerUtils { if (!str.trim().isEmpty()) { // check queue label if (queueLabels == null) { - return false; + return false; } else { if (!queueLabels.contains(str) && !queueLabels.contains(RMNodeLabelsManager.ANY)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 0f7a5b5..d95fe7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -390,12 +390,17 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { SchedulerNode getSchedulerNode(NodeId nodeId); /** - * Normalize a resource request. + * Normalize a resource request using scheduler level maximum resource or + * queue based maximum resource. * * @param requestedResource the resource to be normalized + * @param maxResourceCapability Maximum container allocation value, if null or + * empty scheduler level maximum container allocation value will be + * used * @return the normalized resource */ - Resource getNormalizedResource(Resource requestedResource); + Resource getNormalizedResource(Resource requestedResource, + Resource maxResourceCapability); /** * Verify whether a submitted application lifetime is valid as per configured http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4b274df..75d6144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1159,10 +1159,12 @@ public class CapacityScheduler extends if (asks == null) { return; } + Resource maxAllocation = getMaximumResourceCapability(); for (SchedulingRequest ask: asks) { ResourceSizing sizing = ask.getResourceSizing(); if (sizing != null && sizing.getResources() != null) { - sizing.setResources(getNormalizedResource(sizing.getResources())); + sizing.setResources( + getNormalizedResource(sizing.getResources(), maxAllocation)); } } } @@ -2567,6 +2569,9 @@ public class CapacityScheduler extends @Override public Resource getMaximumResourceCapability(String queueName) { + if(queueName == null || queueName.isEmpty()) { + return getMaximumResourceCapability(); + } CSQueue queue = getQueue(queueName); if (queue == null) { LOG.error("Unknown queue: " + queueName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java index cf944a6..c4c9574 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java @@ -175,11 +175,19 @@ public class PlacementConstraintProcessor extends AbstractPlacementProcessor { private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, List<SchedulingRequest> schedulingRequests) { if (schedulingRequests != null && !schedulingRequests.isEmpty()) { + SchedulerApplicationAttempt appAttempt = + scheduler.getApplicationAttempt(appAttemptId); + String queueName = null; + if(appAttempt != null) { + queueName = appAttempt.getQueueName(); + } + Resource maxAllocation = + scheduler.getMaximumResourceCapability(queueName); // Normalize the Requests before dispatching schedulingRequests.forEach(req -> { Resource reqResource = req.getResourceSizing().getResources(); - req.getResourceSizing() - .setResources(this.scheduler.getNormalizedResource(reqResource)); + req.getResourceSizing().setResources( + this.scheduler.getNormalizedResource(reqResource, maxAllocation)); }); this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, appAttemptId.getApplicationId(), schedulingRequests, 1)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index e48e04b..826d9f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -91,6 +91,9 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { private final SchedulingPolicy defaultSchedulingPolicy; + //Map for maximum container resource allocation per queues by queue name + private final Map<String, Resource> queueMaxContainerAllocationMap; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -138,6 +141,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { this.placementPolicy = newPlacementPolicy; this.configuredQueues = queueProperties.getConfiguredQueues(); this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues(); + this.queueMaxContainerAllocationMap = + queueProperties.getMaxContainerAllocation(); } public AllocationConfiguration(Configuration conf) { @@ -167,6 +172,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); + queueMaxContainerAllocationMap = new HashMap<>(); } /** @@ -272,6 +278,12 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { return maxQueueResource; } + @VisibleForTesting + Resource getQueueMaxContainerAllocation(String queue) { + Resource resource = queueMaxContainerAllocationMap.get(queue); + return resource == null ? Resources.unbounded() : resource; + } + /** * Get the maximum resource allocation for children of the given queue. * @@ -375,6 +387,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { queue.setMaxRunningApps(getQueueMaxApps(name)); queue.setMaxAMShare(getQueueMaxAMShare(name)); queue.setMaxChildQueueResource(getMaxChildResources(name)); + queue.setMaxContainerAllocation(getQueueMaxContainerAllocation(name)); // Set queue metrics. queue.getMetrics().setMinShare(queue.getMinShare()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index cbc74d2..7e4dab8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -547,6 +547,16 @@ public class FSLeafQueue extends FSQueue { this.weights = weight; } + @Override + public Resource getMaximumContainerAllocation() { + if (maxContainerAllocation.equals(Resources.unbounded()) + && getParent() != null) { + return getParent().getMaximumContainerAllocation(); + } else { + return maxContainerAllocation; + } + } + /** * Helper method to compute the amount of minshare starvation. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index d5df549..e9f4af6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -59,7 +59,20 @@ public class FSParentQueue extends FSQueue { FSParentQueue parent) { super(name, scheduler, parent); } - + + @Override + public Resource getMaximumContainerAllocation() { + if (getName().equals("root")) { + return maxContainerAllocation; + } + if (maxContainerAllocation.equals(Resources.unbounded()) + && getParent() != null) { + return getParent().getMaximumContainerAllocation(); + } else { + return maxContainerAllocation; + } + } + void addChildQueue(FSQueue child) { writeLock.lock(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 6b88a32..6217f55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -84,6 +84,7 @@ public abstract class FSQueue implements Queue, Schedulable { private float fairSharePreemptionThreshold = 0.5f; private boolean preemptable = true; private boolean isDynamic = true; + protected Resource maxContainerAllocation; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -163,6 +164,12 @@ public abstract class FSQueue implements Queue, Schedulable { this.maxShare = maxShare; } + public void setMaxContainerAllocation(Resource maxContainerAllocation){ + this.maxContainerAllocation = maxContainerAllocation; + } + + public abstract Resource getMaximumContainerAllocation(); + @Override public Resource getMaxShare() { Resource maxResource = maxShare.getResource(scheduler.getClusterResource()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 43a47ae..da5e4c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -192,6 +192,7 @@ public class FairScheduler extends protected long rackLocalityDelayMs; // Delay for rack locality protected boolean assignMultiple; // Allocate multiple containers per // heartbeat + @VisibleForTesting boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat @@ -227,12 +228,12 @@ public class FairScheduler extends private void validateConf(FairSchedulerConfiguration config) { // validate scheduler memory allocation setting - int minMem = config.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); - int maxMem = config.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + int minMem = + config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + int maxMem = + config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); if (minMem < 0 || minMem > maxMem) { throw new YarnRuntimeException("Invalid resource scheduler memory" @@ -254,12 +255,12 @@ public class FairScheduler extends } // validate scheduler vcores allocation setting - int minVcores = config.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - int maxVcores = config.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + int minVcores = + config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + int maxVcores = + config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); if (minVcores < 0 || minVcores > maxVcores) { throw new YarnRuntimeException("Invalid resource scheduler vcores" @@ -833,14 +834,35 @@ public class FairScheduler extends } @Override - public Resource getNormalizedResource(Resource requestedResource) { + public Resource getNormalizedResource(Resource requestedResource, + Resource maxResourceCapability) { return SchedulerUtils.getNormalizedResource(requestedResource, DOMINANT_RESOURCE_CALCULATOR, minimumAllocation, - getMaximumResourceCapability(), + maxResourceCapability, incrAllocation); } + @Override + public Resource getMaximumResourceCapability(String queueName) { + if(queueName == null || queueName.isEmpty()) { + return getMaximumResourceCapability(); + } + FSQueue queue = queueMgr.getQueue(queueName); + Resource schedulerLevelMaxResourceCapability = + getMaximumResourceCapability(); + if (queue == null) { + return schedulerLevelMaxResourceCapability; + } + Resource queueMaxResourceCapability = queue.getMaximumContainerAllocation(); + if (queueMaxResourceCapability.equals(Resources.unbounded())) { + return schedulerLevelMaxResourceCapability; + } else { + return Resources.componentwiseMin(schedulerLevelMaxResourceCapability, + queueMaxResourceCapability); + } + } + @VisibleForTesting @Override public void killContainer(RMContainer container) { @@ -897,7 +919,7 @@ public class FairScheduler extends handleContainerUpdates(application, updateRequests); // Sanity check - normalizeResourceRequests(ask); + normalizeResourceRequests(ask, queue.getName()); // TODO, normalize SchedulingRequest http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java index 441c34a..72c6c68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java @@ -51,6 +51,8 @@ public class AllocationFileQueueParser { private static final String MAX_CHILD_RESOURCES = "maxChildResources"; private static final String MAX_RUNNING_APPS = "maxRunningApps"; private static final String MAX_AMSHARE = "maxAMShare"; + public static final String MAX_CONTAINER_ALLOCATION = + "maxContainerAllocation"; private static final String WEIGHT = "weight"; private static final String MIN_SHARE_PREEMPTION_TIMEOUT = "minSharePreemptionTimeout"; @@ -155,6 +157,11 @@ public class AllocationFileQueueParser { float val = Float.parseFloat(text); val = Math.min(val, 1.0f); builder.queueMaxAMShares(queueName, val); + } else if (MAX_CONTAINER_ALLOCATION.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + ConfigurableResource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + builder.queueMaxContainerAllocation(queueName, val.getResource()); } else if (WEIGHT.equals(field.getTagName())) { String text = getTrimmedTextData(field); double val = Double.parseDouble(text); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java index ee5f179..badf05f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java @@ -53,6 +53,7 @@ public class QueueProperties { private final Set<String> reservableQueues; private final Set<String> nonPreemptableQueues; private final Map<FSQueueType, Set<String>> configuredQueues; + private final Map<String, Resource> queueMaxContainerAllocation; QueueProperties(Builder builder) { this.reservableQueues = builder.reservableQueues; @@ -70,6 +71,7 @@ public class QueueProperties { this.maxChildQueueResources = builder.maxChildQueueResources; this.reservationAcls = builder.reservationAcls; this.queueAcls = builder.queueAcls; + this.queueMaxContainerAllocation = builder.queueMaxContainerAllocation; } public Map<FSQueueType, Set<String>> getConfiguredQueues() { @@ -133,7 +135,11 @@ public class QueueProperties { return nonPreemptableQueues; } - /** + public Map<String, Resource> getMaxContainerAllocation() { + return queueMaxContainerAllocation; + } + + /** * Builder class for {@link QueueProperties}. * All methods are adding queue properties to the maps of this builder * keyed by the queue's name except some methods @@ -149,6 +155,7 @@ public class QueueProperties { new HashMap<>(); private Map<String, Integer> queueMaxApps = new HashMap<>(); private Map<String, Float> queueMaxAMShares = new HashMap<>(); + private Map<String, Resource> queueMaxContainerAllocation = new HashMap<>(); private Map<String, Float> queueWeights = new HashMap<>(); private Map<String, SchedulingPolicy> queuePolicies = new HashMap<>(); private Map<String, Long> minSharePreemptionTimeouts = new HashMap<>(); @@ -253,6 +260,12 @@ public class QueueProperties { return this; } + public Builder queueMaxContainerAllocation(String queueName, + Resource value) { + queueMaxContainerAllocation.put(queueName, value); + return this; + } + public void configuredQueues(FSQueueType queueType, String queueName) { this.configuredQueues.get(queueType).add(queueName); } @@ -275,6 +288,5 @@ public class QueueProperties { public QueueProperties build() { return new QueueProperties(this); } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java index ef417d4..7f31def 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java @@ -78,6 +78,8 @@ public class FairSchedulerPage extends RmView { __("Num Pending Applications:", qinfo.getNumPendingApplications()). __("Min Resources:", qinfo.getMinResources().toString()). __("Max Resources:", qinfo.getMaxResources().toString()). + __("Max Container Allocation:", + qinfo.getMaxContainerAllocation().toString()). __("Reserved Resources:", qinfo.getReservedResources().toString()); int maxApps = qinfo.getMaxApplications(); if (maxApps < Integer.MAX_VALUE) { @@ -107,6 +109,8 @@ public class FairSchedulerPage extends RmView { __("Used Resources:", qinfo.getUsedResources().toString()). __("Min Resources:", qinfo.getMinResources().toString()). __("Max Resources:", qinfo.getMaxResources().toString()). + __("Max Container Allocation:", + qinfo.getMaxContainerAllocation().toString()). __("Reserved Resources:", qinfo.getReservedResources().toString()); int maxApps = qinfo.getMaxApplications(); if (maxApps < Integer.MAX_VALUE) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index 913513c..70c5fd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -60,6 +60,7 @@ public class FairSchedulerQueueInfo { private ResourceInfo fairResources; private ResourceInfo clusterResources; private ResourceInfo reservedResources; + private ResourceInfo maxContainerAllocation; private long allocatedContainers; private long reservedContainers; @@ -99,6 +100,8 @@ public class FairSchedulerQueueInfo { maxResources = new ResourceInfo( Resources.componentwiseMin(queue.getMaxShare(), scheduler.getClusterResource())); + maxContainerAllocation = + new ResourceInfo(scheduler.getMaximumResourceCapability(queueName)); reservedResources = new ResourceInfo(queue.getReservedResource()); fractionMemSteadyFairShare = @@ -186,7 +189,11 @@ public class FairSchedulerQueueInfo { public ResourceInfo getMaxResources() { return maxResources; } - + + public ResourceInfo getMaxContainerAllocation() { + return maxContainerAllocation; + } + public ResourceInfo getReservedResources() { return reservedResources; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java new file mode 100644 index 0000000..36258b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static java.util.stream.Collectors.toSet; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.mockito.ArgumentCaptor; + +/** + * Base class for AppManager related test. + * + */ +public class AppManagerTestBase { + + // Extend and make the functions we want to test public + protected class TestRMAppManager extends RMAppManager { + private final RMStateStore stateStore; + + public TestRMAppManager(RMContext context, Configuration conf) { + super(context, null, null, new ApplicationACLsManager(conf), conf); + this.stateStore = context.getStateStore(); + } + + public TestRMAppManager(RMContext context, + ClientToAMTokenSecretManagerInRM clientToAMSecretManager, + YarnScheduler scheduler, ApplicationMasterService masterService, + ApplicationACLsManager applicationACLsManager, Configuration conf) { + super(context, scheduler, masterService, applicationACLsManager, conf); + this.stateStore = context.getStateStore(); + } + + public void checkAppNumCompletedLimit() { + super.checkAppNumCompletedLimit(); + } + + public void finishApplication(ApplicationId appId) { + super.finishApplication(appId); + } + + public int getCompletedAppsListSize() { + return super.getCompletedAppsListSize(); + } + + public int getNumberOfCompletedAppsInStateStore() { + return this.completedAppsInStateStore; + } + + public List<ApplicationId> getCompletedApps() { + return completedApps; + } + + public Set<ApplicationId> getFirstNCompletedApps(int n) { + return getCompletedApps().stream().limit(n).collect(toSet()); + } + + public Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) { + return getCompletedApps().stream().limit(n) + .filter(app -> app.getId() % 2 == 0).collect(toSet()); + } + + public Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) { + ArgumentCaptor<RMApp> argumentCaptor = + ArgumentCaptor.forClass(RMApp.class); + verify(stateStore, times(numRemoves)) + .removeApplication(argumentCaptor.capture()); + return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) + .collect(toSet()); + } + + public void submitApplication( + ApplicationSubmissionContext submissionContext, String user) + throws YarnException { + super.submitApplication(submissionContext, System.currentTimeMillis(), + user); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index df86f28..408cb18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -513,6 +513,14 @@ public class MockRM extends ResourceManager { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, String queue) throws Exception { + return submitApp(masterMemory, "", + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), + null); + } + public RMApp submitApp(int masterMemory, Set<String> appTags) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 0bd5372..8f48293 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -68,7 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; @@ -82,12 +81,10 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -99,10 +96,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import static java.util.stream.Collectors.toSet; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.matches; import static org.mockito.Mockito.doAnswer; @@ -117,7 +114,7 @@ import static org.mockito.Mockito.when; * */ -public class TestAppManager{ +public class TestAppManager extends AppManagerTestBase{ private Log LOG = LogFactory.getLog(TestAppManager.class); private static RMAppEventType appEventType = RMAppEventType.KILL; @@ -234,70 +231,6 @@ public class TestAppManager{ setAppEventType(event.getType()); System.out.println("in handle routine " + getAppEventType().toString()); } - } - - - // Extend and make the functions we want to test public - public class TestRMAppManager extends RMAppManager { - private final RMStateStore stateStore; - - public TestRMAppManager(RMContext context, Configuration conf) { - super(context, null, null, new ApplicationACLsManager(conf), conf); - this.stateStore = context.getStateStore(); - } - - public TestRMAppManager(RMContext context, - ClientToAMTokenSecretManagerInRM clientToAMSecretManager, - YarnScheduler scheduler, ApplicationMasterService masterService, - ApplicationACLsManager applicationACLsManager, Configuration conf) { - super(context, scheduler, masterService, applicationACLsManager, conf); - this.stateStore = context.getStateStore(); - } - - public void checkAppNumCompletedLimit() { - super.checkAppNumCompletedLimit(); - } - - public void finishApplication(ApplicationId appId) { - super.finishApplication(appId); - } - - public int getCompletedAppsListSize() { - return super.getCompletedAppsListSize(); - } - - public int getNumberOfCompletedAppsInStateStore() { - return this.completedAppsInStateStore; - } - - List<ApplicationId> getCompletedApps() { - return completedApps; - } - - Set<ApplicationId> getFirstNCompletedApps(int n) { - return getCompletedApps().stream().limit(n).collect(toSet()); - } - - Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) { - return getCompletedApps().stream().limit(n) - .filter(app -> app.getId() % 2 == 0).collect(toSet()); - } - - Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) { - ArgumentCaptor<RMApp> argumentCaptor = - ArgumentCaptor.forClass(RMApp.class); - verify(stateStore, times(numRemoves)) - .removeApplication(argumentCaptor.capture()); - return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) - .collect(toSet()); - } - - public void submitApplication( - ApplicationSubmissionContext submissionContext, String user) - throws YarnException, IOException { - super.submitApplication(submissionContext, System.currentTimeMillis(), - user); - } } private void addToCompletedApps(TestRMAppManager appMonitor, @@ -1213,17 +1146,21 @@ public class TestAppManager{ Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + when(scheduler.getMaximumResourceCapability(anyString())).thenReturn( + Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + ResourceCalculator rs = mock(ResourceCalculator.class); when(scheduler.getResourceCalculator()).thenReturn(rs); - when(scheduler.getNormalizedResource(any())) + when(scheduler.getNormalizedResource(any(), any())) .thenAnswer(new Answer<Resource>() { - @Override - public Resource answer(InvocationOnMock invocationOnMock) - throws Throwable { - return (Resource) invocationOnMock.getArguments()[0]; - } - }); + @Override + public Resource answer(InvocationOnMock invocationOnMock) + throws Throwable { + return (Resource) invocationOnMock.getArguments()[0]; + } + }); return scheduler; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java new file mode 100644 index 0000000..feb7ed2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashMap; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.MockApps; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Testing applications being retired from RM with fair scheduler. + * + */ +public class TestAppManagerWithFairScheduler extends AppManagerTestBase{ + + private static final String TEST_FOLDER = "test-queues"; + + private static YarnConfiguration conf = new YarnConfiguration(); + + @BeforeClass + public static void setup() throws IOException { + String allocFile = + GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath(); + + int queueMaxAllocation = 512; + + PrintWriter out = new PrintWriter(new FileWriter(allocFile)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println(" <queue name=\"queueA\">"); + out.println(" <maxContainerAllocation>" + queueMaxAllocation + + " mb 1 vcores" + "</maxContainerAllocation>"); + out.println(" </queue>"); + out.println(" <queue name=\"queueB\">"); + out.println(" </queue>"); + out.println("</allocations>"); + out.close(); + + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile); + } + + @AfterClass + public static void teardown(){ + File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER); + allocFile.delete(); + } + + @Test + public void testQueueSubmitWithHighQueueContainerSize() + throws YarnException { + + ApplicationId appId = MockApps.newAppID(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + Resource resource = Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + + ApplicationSubmissionContext asContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + asContext.setApplicationId(appId); + asContext.setResource(resource); + asContext.setPriority(Priority.newInstance(0)); + asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); + asContext.setQueue("queueA"); + QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class); + + // Setup a PlacementManager returns a new queue + PlacementManager placementMgr = mock(PlacementManager.class); + doAnswer(new Answer<ApplicationPlacementContext>() { + + @Override + public ApplicationPlacementContext answer(InvocationOnMock invocation) + throws Throwable { + return new ApplicationPlacementContext("queueA"); + } + + }).when(placementMgr).placeApplication( + any(ApplicationSubmissionContext.class), matches("test1")); + doAnswer(new Answer<ApplicationPlacementContext>() { + + @Override + public ApplicationPlacementContext answer(InvocationOnMock invocation) + throws Throwable { + return new ApplicationPlacementContext("queueB"); + } + + }).when(placementMgr).placeApplication( + any(ApplicationSubmissionContext.class), matches("test2")); + + MockRM newMockRM = new MockRM(conf); + RMContext newMockRMContext = newMockRM.getRMContext(); + newMockRMContext.setQueuePlacementManager(placementMgr); + ApplicationMasterService masterService = new ApplicationMasterService( + newMockRMContext, newMockRMContext.getScheduler()); + + TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext, + new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(), + masterService, new ApplicationACLsManager(conf), conf); + + // only user test has permission to submit to 'test' queue + + try { + newAppMonitor.submitApplication(asContext, "test1"); + Assert.fail("Test should fail on too high allocation!"); + } catch (InvalidResourceRequestException e) { + Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION, + e.getInvalidResourceType()); + } + + // Should not throw exception + newAppMonitor.submitApplication(asContext, "test2"); + } + + private static ContainerLaunchContext mockContainerLaunchContext( + RecordFactory recordFactory) { + ContainerLaunchContext amContainer = recordFactory.newRecordInstance( + ContainerLaunchContext.class); + amContainer + .setApplicationACLs(new HashMap<ApplicationAccessType, String>()); + return amContainer; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 61048a5..3fe46b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -362,9 +362,9 @@ public class TestClientRMService { @Test public void testGetApplicationReport() throws Exception { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); + ResourceScheduler scheduler = mock(ResourceScheduler.class); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); ApplicationId appId1 = getApplicationId(1); @@ -373,7 +373,7 @@ public class TestClientRMService { mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(), ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true); - ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, + ClientRMService rmService = new ClientRMService(rmContext, scheduler, null, mockAclsManager, null, null); try { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -456,9 +456,9 @@ public class TestClientRMService { public void testGetApplicationResourceUsageReportDummy() throws YarnException, IOException { ApplicationAttemptId attemptId = getApplicationAttemptId(1); - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); when(rmContext.getDispatcher().getEventHandler()).thenReturn( new EventHandler<Event>() { public void handle(Event event) { @@ -468,7 +468,7 @@ public class TestClientRMService { mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, - rmContext, yarnScheduler, null, asContext, config, null, null); + rmContext, scheduler, null, asContext, config, null, null); ApplicationResourceUsageReport report = rmAppAttemptImpl .getApplicationResourceUsageReport(); assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT); @@ -537,14 +537,14 @@ public class TestClientRMService { } public ClientRMService createRMService() throws IOException, YarnException { - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, - yarnScheduler); + scheduler); when(rmContext.getRMApps()).thenReturn(apps); when(rmContext.getYarnConfiguration()).thenReturn(new Configuration()); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null, + RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, mock(ApplicationACLsManager.class), new Configuration()); when(rmContext.getDispatcher().getEventHandler()).thenReturn( new EventHandler<Event>() { @@ -558,7 +558,7 @@ public class TestClientRMService { mockQueueACLsManager.checkAccess(any(UserGroupInformation.class), any(QueueACL.class), any(RMApp.class), any(String.class), any())).thenReturn(true); - return new ClientRMService(rmContext, yarnScheduler, appManager, + return new ClientRMService(rmContext, scheduler, appManager, mockAclsManager, mockQueueACLsManager, null); } @@ -907,9 +907,9 @@ public class TestClientRMService { @Test public void testGetQueueInfo() throws Exception { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); + ResourceScheduler scheduler = mock(ResourceScheduler.class); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class); QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class); @@ -921,7 +921,7 @@ public class TestClientRMService { any(ApplicationAccessType.class), anyString(), any(ApplicationId.class))).thenReturn(true); - ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, + ClientRMService rmService = new ClientRMService(rmContext, scheduler, null, mockAclsManager, mockQueueACLsManager, null); GetQueueInfoRequest request = recordFactory .newRecordInstance(GetQueueInfoRequest.class); @@ -960,7 +960,7 @@ public class TestClientRMService { any(ApplicationAccessType.class), anyString(), any(ApplicationId.class))).thenReturn(false); - ClientRMService rmService1 = new ClientRMService(rmContext, yarnScheduler, + ClientRMService rmService1 = new ClientRMService(rmContext, scheduler, null, mockAclsManager1, mockQueueACLsManager1, null); request.setQueueName("testqueue"); request.setIncludeApplications(true); @@ -974,12 +974,12 @@ public class TestClientRMService { @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); when(rmContext.getStateStore()).thenReturn(stateStore); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, mock(ApplicationACLsManager.class), new Configuration()); when(rmContext.getDispatcher().getEventHandler()).thenReturn( new EventHandler<Event>() { @@ -1001,7 +1001,7 @@ public class TestClientRMService { any())) .thenReturn(true); ClientRMService rmService = - new ClientRMService(rmContext, yarnScheduler, appManager, + new ClientRMService(rmContext, scheduler, appManager, mockAclsManager, mockQueueACLsManager, null); rmService.init(new Configuration()); @@ -1085,15 +1085,15 @@ public class TestClientRMService { * 2. Test each of the filters */ // Basic setup - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); when(rmContext.getStateStore()).thenReturn(stateStore); doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) .getRMTimelineCollectorManager(); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, mock(ApplicationACLsManager.class), new Configuration()); when(rmContext.getDispatcher().getEventHandler()).thenReturn( new EventHandler<Event>() { @@ -1107,7 +1107,7 @@ public class TestClientRMService { any())) .thenReturn(true); ClientRMService rmService = - new ClientRMService(rmContext, yarnScheduler, appManager, + new ClientRMService(rmContext, scheduler, appManager, mockAclsManager, mockQueueACLsManager, null); rmService.init(new Configuration()); @@ -1238,12 +1238,12 @@ public class TestClientRMService { public void testConcurrentAppSubmit() throws IOException, InterruptedException, BrokenBarrierException, YarnException { - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); when(rmContext.getStateStore()).thenReturn(stateStore); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, + RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, mock(ApplicationACLsManager.class), new Configuration()); final ApplicationId appId1 = getApplicationId(100); @@ -1280,7 +1280,7 @@ public class TestClientRMService { .getRMTimelineCollectorManager(); final ClientRMService rmService = - new ClientRMService(rmContext, yarnScheduler, appManager, null, null, + new ClientRMService(rmContext, scheduler, appManager, null, null, null); rmService.init(new Configuration()); @@ -1339,7 +1339,7 @@ public class TestClientRMService { return submitRequest; } - private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext) + private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) throws IOException { Dispatcher dispatcher = mock(Dispatcher.class); when(rmContext.getDispatcher()).thenReturn(dispatcher); @@ -1361,22 +1361,21 @@ public class TestClientRMService { queueConfigsByPartition.put("*", queueConfigs); queInfo.setQueueConfigurations(queueConfigsByPartition); - when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) + when(scheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean())) .thenReturn(queInfo); - when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean())) - .thenThrow(new IOException("queue does not exist")); + when(scheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), + anyBoolean())).thenThrow(new IOException("queue does not exist")); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration()); - ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, - yarnScheduler); + ConcurrentHashMap<ApplicationId, RMApp> apps = + getRMApps(rmContext, scheduler); when(rmContext.getRMApps()).thenReturn(apps); - when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn( + when(scheduler.getAppsInQueue(eq("testqueue"))).thenReturn( getSchedulerApps(apps)); - ResourceScheduler rs = mock(ResourceScheduler.class); - when(rmContext.getScheduler()).thenReturn(rs); + when(rmContext.getScheduler()).thenReturn(scheduler); } private ConcurrentHashMap<ApplicationId, RMApp> getRMApps( @@ -1480,31 +1479,32 @@ public class TestClientRMService { return app; } - private static YarnScheduler mockYarnScheduler() throws YarnException { - YarnScheduler yarnScheduler = mock(YarnScheduler.class); - when(yarnScheduler.getMinimumResourceCapability()).thenReturn( + private static ResourceScheduler mockResourceScheduler() + throws YarnException { + ResourceScheduler scheduler = mock(ResourceScheduler.class); + when(scheduler.getMinimumResourceCapability()).thenReturn( Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - when(yarnScheduler.getMaximumResourceCapability()).thenReturn( + when(scheduler.getMaximumResourceCapability()).thenReturn( Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - when(yarnScheduler.getMaximumResourceCapability(any(String.class))) - .thenReturn(Resources.createResource( + when(scheduler.getMaximumResourceCapability(anyString())).thenReturn( + Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn( + when(scheduler.getAppsInQueue(QUEUE_1)).thenReturn( Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102))); - when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn( + when(scheduler.getAppsInQueue(QUEUE_2)).thenReturn( Arrays.asList(getApplicationAttemptId(103))); ApplicationAttemptId attemptId = getApplicationAttemptId(1); - when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); + when(scheduler.getAppResourceUsageReport(attemptId)).thenReturn(null); ResourceCalculator rs = mock(ResourceCalculator.class); - when(yarnScheduler.getResourceCalculator()).thenReturn(rs); + when(scheduler.getResourceCalculator()).thenReturn(rs); - when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class), + when(scheduler.checkAndGetApplicationPriority(any(Priority.class), any(UserGroupInformation.class), anyString(), any(ApplicationId.class))) .thenReturn(Priority.newInstance(0)); - return yarnScheduler; + return scheduler; } private ResourceManager setupResourceManager() { @@ -2427,15 +2427,15 @@ public class TestClientRMService { * Submit 3 applications alternately in two queues */ // Basic setup - YarnScheduler yarnScheduler = mockYarnScheduler(); + ResourceScheduler scheduler = mockResourceScheduler(); RMContext rmContext = mock(RMContext.class); - mockRMContext(yarnScheduler, rmContext); + mockRMContext(scheduler, rmContext); RMStateStore stateStore = mock(RMStateStore.class); when(rmContext.getStateStore()).thenReturn(stateStore); doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext) .getRMTimelineCollectorManager(); - RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null, + RMAppManager appManager = new RMAppManager(rmContext, scheduler, null, mock(ApplicationACLsManager.class), new Configuration()); when(rmContext.getDispatcher().getEventHandler()) .thenReturn(new EventHandler<Event>() { @@ -2454,7 +2454,7 @@ public class TestClientRMService { when(appAclsManager.checkAccess(eq(UserGroupInformation.getCurrentUser()), any(ApplicationAccessType.class), any(String.class), any(ApplicationId.class))).thenReturn(false); - ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler, + ClientRMService rmService = new ClientRMService(rmContext, scheduler, appManager, appAclsManager, queueAclsManager, null); rmService.init(new Configuration()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org