YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. Contributed by Suma Shivaprasad.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/821b0de4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/821b0de4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/821b0de4 Branch: refs/heads/HDFS-7240 Commit: 821b0de4c59156d4a65112de03ba3e7e1c88e309 Parents: 5700556 Author: Sunil G <sun...@apache.org> Authored: Mon Apr 9 21:17:22 2018 +0530 Committer: Sunil G <sun...@apache.org> Committed: Mon Apr 9 21:17:22 2018 +0530 ---------------------------------------------------------------------- .../server/resourcemanager/RMServerUtils.java | 5 +- .../rmapp/attempt/RMAppAttemptImpl.java | 47 ++ .../resourcemanager/scheduler/Allocation.java | 12 + .../scheduler/SchedulerUtils.java | 33 +- .../capacity/AutoCreatedLeafQueue.java | 3 +- .../AutoCreatedQueueManagementPolicy.java | 12 +- .../scheduler/capacity/CapacityScheduler.java | 2 + .../CapacitySchedulerConfiguration.java | 28 + .../scheduler/capacity/LeafQueue.java | 11 + .../scheduler/capacity/ManagedParentQueue.java | 5 +- .../GuaranteedOrZeroCapacityOverTimePolicy.java | 573 +++++++++++-------- .../placement/PendingAskUpdateResult.java | 8 + .../yarn/server/resourcemanager/MockNM.java | 15 + .../server/resourcemanager/TestAppManager.java | 20 +- ...stCapacitySchedulerAutoCreatedQueueBase.java | 241 +++++--- .../TestCapacitySchedulerAutoQueueCreation.java | 233 +++++--- .../TestQueueManagementDynamicEditPolicy.java | 30 +- 17 files changed, 834 insertions(+), 444 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 33451295..ab6bbcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -236,13 +236,14 @@ public class RMServerUtils { */ public static void normalizeAndValidateRequests(List<ResourceRequest> ask, Resource maximumResource, String queueName, YarnScheduler scheduler, - RMContext rmContext) - throws InvalidResourceRequestException { + RMContext rmContext) throws InvalidResourceRequestException { // Get queue from scheduler QueueInfo queueInfo = null; try { queueInfo = scheduler.getQueueInfo(queueName, false, false); } catch (IOException e) { + //Queue may not exist since it could be auto-created in case of + // dynamic queues } for (ResourceRequest resReq : ask) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index c23b135..1b1e2c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -75,6 +76,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; + +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels + .RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -1109,6 +1113,49 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { amBlacklist.getBlacklistAdditions() + ") and removals(" + amBlacklist.getBlacklistRemovals() + ")"); } + + QueueInfo queueInfo = null; + for (ResourceRequest amReq : appAttempt.amReqs) { + if (amReq.getNodeLabelExpression() == null && ResourceRequest.ANY + .equals(amReq.getResourceName())) { + String queue = appAttempt.rmApp.getQueue(); + + //Load queue only once since queue will be same across attempts + if (queueInfo == null) { + try { + queueInfo = appAttempt.scheduler.getQueueInfo(queue, false, + false); + } catch (IOException e) { + LOG.error("Could not find queue for application : ", e); + // Set application status to REJECTED since we cant find the + // queue + appAttempt.rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttempt.getAppAttemptId(), + RMAppAttemptEventType.FAIL, + "Could not find queue for application : " + + appAttempt.rmApp.getQueue())); + appAttempt.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(appAttempt.rmApp.getApplicationId(), RMAppEventType + .APP_REJECTED, + "Could not find queue for application : " + + appAttempt.rmApp.getQueue())); + return RMAppAttemptState.FAILED; + } + } + + String labelExp = RMNodeLabelsManager.NO_LABEL; + if (queueInfo != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting default node label expression : " + queueInfo + .getDefaultNodeLabelExpression()); + } + labelExp = queueInfo.getDefaultNodeLabelExpression(); + } + + amReq.setNodeLabelExpression(labelExp); + } + } + // AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate( http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 768afde..9573ac8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -132,4 +132,16 @@ public class Allocation { public void setResourceLimit(Resource resource) { this.resourceLimit = resource; } + + @Override + public String toString() { + return "Allocation{" + "containers=" + containers + ", strictContainers=" + + strictContainers + ", fungibleContainers=" + fungibleContainers + + ", fungibleResources=" + fungibleResources + ", nmTokens=" + nmTokens + + ", increasedContainers=" + increasedContainers + + ", decreasedContainers=" + decreasedContainers + + ", promotedContainers=" + promotedContainers + ", demotedContainers=" + + demotedContainers + ", previousAttemptContainers=" + + previousAttemptContainers + ", resourceLimit=" + resourceLimit + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 27563d6..c0d7d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -186,19 +186,33 @@ public class SchedulerUtils { ResourceRequest resReq, QueueInfo queueInfo) { String labelExp = resReq.getNodeLabelExpression(); + if (LOG.isDebugEnabled()) { + LOG.debug("Requested Node Label Expression : " + labelExp); + LOG.debug("Queue Info : " + queueInfo); + } // if queue has default label expression, and RR doesn't have, use the // default label expression of queue if (labelExp == null && queueInfo != null && ResourceRequest.ANY .equals(resReq.getResourceName())) { + if ( LOG.isDebugEnabled()) { + LOG.debug("Setting default node label expression : " + queueInfo + .getDefaultNodeLabelExpression()); + } labelExp = queueInfo.getDefaultNodeLabelExpression(); } - // If labelExp still equals to null, set it to be NO_LABEL - if (labelExp == null) { + // If labelExp still equals to null, it could either be a dynamic queue + // or the label is not configured + // set it to be NO_LABEL in case of a pre-configured queue. Dynamic + // queues are handled in RMAppAttemptImp.ScheduledTransition + if (labelExp == null && queueInfo != null) { labelExp = RMNodeLabelsManager.NO_LABEL; } - resReq.setNodeLabelExpression(labelExp); + + if ( labelExp != null) { + resReq.setNodeLabelExpression(labelExp); + } } public static void normalizeAndValidateRequest(ResourceRequest resReq, @@ -209,6 +223,7 @@ public class SchedulerUtils { isRecovery, rmContext, null); } + public static void normalizeAndValidateRequest(ResourceRequest resReq, Resource maximumResource, String queueName, YarnScheduler scheduler, boolean isRecovery, RMContext rmContext, QueueInfo queueInfo) @@ -233,11 +248,12 @@ public class SchedulerUtils { try { queueInfo = scheduler.getQueueInfo(queueName, false, false); } catch (IOException e) { - // it is possible queue cannot get when queue mapping is set, just ignore - // the queueInfo here, and move forward + //Queue may not exist since it could be auto-created in case of + // dynamic queues } } SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo); + if (!isRecovery) { validateResourceRequest(resReq, maximumResource, queueInfo, rmContext); } @@ -245,8 +261,7 @@ public class SchedulerUtils { public static void normalizeAndvalidateRequest(ResourceRequest resReq, Resource maximumResource, String queueName, YarnScheduler scheduler, - RMContext rmContext) - throws InvalidResourceRequestException { + RMContext rmContext) throws InvalidResourceRequestException { normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler, rmContext, null); } @@ -296,7 +311,7 @@ public class SchedulerUtils { + "resource request has resource name = " + resReq.getResourceName()); } - + // we don't allow specify label expression with more than one node labels now if (labelExp != null && labelExp.contains("&&")) { throw new InvalidLabelResourceRequestException( @@ -305,7 +320,7 @@ public class SchedulerUtils { + "in a node label expression, node label expression = " + labelExp); } - + if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) { if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(), labelExp, rmContext)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index 8b67087..e12b55e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -148,11 +148,10 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue { try { for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels ()) { - //TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574 setEntitlement(nodeLabel, new QueueEntitlement(0.0f, parent.getLeafQueueTemplate() .getQueueCapacities() - .getMaximumCapacity())); + .getMaximumCapacity(nodeLabel))); } } catch (SchedulerDynamicEditException e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java index f7a4bbd..388e9d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + + +import java.io.IOException; import java.util.List; public interface AutoCreatedQueueManagementPolicy { @@ -26,14 +29,15 @@ public interface AutoCreatedQueueManagementPolicy { * Initialize policy * @param schedulerContext Capacity Scheduler context */ - void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue); + void init(CapacitySchedulerContext schedulerContext, ParentQueue + parentQueue) throws IOException; /** * Reinitialize policy state ( if required ) * @param schedulerContext Capacity Scheduler context */ void reinitialize(CapacitySchedulerContext schedulerContext, - ParentQueue parentQueue); + ParentQueue parentQueue) throws IOException; /** * Get initial template for the specified leaf queue @@ -48,6 +52,10 @@ public interface AutoCreatedQueueManagementPolicy { /** * Compute/Adjust child queue capacities * for auto created leaf queues + * This computes queue entitlements but does not update LeafQueueState or + * queue capacities. Scheduler calls commitQueueManagemetChanges after + * validation after applying queue changes and commits to LeafQueueState + * are done in commitQueueManagementChanges. * * @return returns a list of suggested QueueEntitlementChange(s) which may * or may not be be enforced by the scheduler http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e59bdde..776e512 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1204,6 +1204,8 @@ public class CapacityScheduler extends updateDemandForQueue.getOrderingPolicy().demandUpdated(application); } + LOG.info("Allocation for application " + applicationAttemptId + " : " + + allocation + " with cluster resource : " + getClusterResource()); return allocation; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 1870aef..c41bd96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1878,6 +1878,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur setCapacity(leafQueueConfPrefix, val); } + @VisibleForTesting + @Private + public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath, + String label, float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setCapacityByLabel(leafQueueConfPrefix, label, val); + } + @Private @VisibleForTesting public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath, @@ -1887,6 +1896,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur setMaximumCapacity(leafQueueConfPrefix, val); } + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + String label, float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setMaximumCapacityByLabel(leafQueueConfPrefix, label, val); + } + @VisibleForTesting @Private public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath, @@ -1905,6 +1923,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur setUserLimitFactor(leafQueueConfPrefix, val); } + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String + queuePath, + String expression) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setDefaultNodeLabelExpression(leafQueueConfPrefix, expression); + } + public static String getUnits(String resourceValue) { String units; for (int i = 0; i < resourceValue.length(); i++) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8d1428d..1ae8f91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -778,6 +778,17 @@ public class LeafQueue extends AbstractCSQueue { metrics.setAMResouceLimit(nodePartition, amResouceLimit); queueUsage.setAMLimit(nodePartition, amResouceLimit); + if(LOG.isDebugEnabled()) { + LOG.debug("Queue: " + getQueueName() + ", node label : " + + nodePartition + + ", queue " + + "partition " + + "resource : " + queuePartitionResource + ',' + + " queue current limit : " + queueCurrentLimit + "," + + " queue partition usable resource : " + + queuePartitionUsableResource + "," + + " amResourceLimit : " + amResouceLimit); + } return amResouceLimit; } finally { writeLock.unlock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java index cbdb21d..2494000 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -132,7 +132,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { } } - private void initializeQueueManagementPolicy() { + private void initializeQueueManagementPolicy() throws IOException { queueManagementPolicy = csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( getQueuePath()); @@ -140,7 +140,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { queueManagementPolicy.init(csContext, this); } - private void reinitializeQueueManagementPolicy() { + private void reinitializeQueueManagementPolicy() throws IOException { AutoCreatedQueueManagementPolicy managementPolicy = csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass( getQueuePath()); @@ -339,6 +339,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue { ((AutoCreatedLeafQueue) childQueue).validateConfigurations(template); break; } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index aee6405..b2301fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -63,8 +64,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager - .NO_LABEL; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler .capacity.CSQueueUtils.EPSILON; @@ -85,8 +84,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy private static final Log LOG = LogFactory.getLog( GuaranteedOrZeroCapacityOverTimePolicy.class); - private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT; - private ReentrantReadWriteLock.WriteLock writeLock; private ReentrantReadWriteLock.ReadLock readLock; @@ -97,12 +94,70 @@ public class GuaranteedOrZeroCapacityOverTimePolicy private QueueCapacities leafQueueTemplateCapacities; - private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>(); + private Set<String> leafQueueTemplateNodeLabels; + + private LeafQueueState leafQueueState = new LeafQueueState(); private Clock clock = new MonotonicClock(); private class LeafQueueState { + //map of partition-> queueName->{leaf queue's state} + private Map<String, Map<String, LeafQueueStatePerPartition>> + leafQueueStateMap = new HashMap<>(); + + public boolean containsLeafQueue(String leafQueueName, String partition) { + if (leafQueueStateMap.containsKey(partition)) { + return leafQueueStateMap.get(partition).containsKey(leafQueueName); + } + return false; + } + + private boolean containsPartition(String partition) { + if (leafQueueStateMap.containsKey(partition)) { + return true; + } + return false; + } + + private boolean addLeafQueueStateIfNotExists(String leafQueueName, + String partition, LeafQueueStatePerPartition leafQueueState) { + if (!containsPartition(partition)) { + leafQueueStateMap.put(partition, new HashMap<>()); + } + if (!containsLeafQueue(leafQueueName, partition)) { + leafQueueStateMap.get(partition).put(leafQueueName, leafQueueState); + return true; + } + return false; + } + + public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue, + String partition) { + return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), partition, + new LeafQueueStatePerPartition()); + } + + public LeafQueueStatePerPartition getLeafQueueStatePerPartition( + String leafQueueName, String partition) { + if (leafQueueStateMap.get(partition) != null) { + return leafQueueStateMap.get(partition).get(leafQueueName); + } + return null; + } + + public Map<String, Map<String, LeafQueueStatePerPartition>> + getLeafQueueStateMap() { + return leafQueueStateMap; + } + + private void clear() { + leafQueueStateMap.clear(); + } + } + + private class LeafQueueStatePerPartition { + private AtomicBoolean isActive = new AtomicBoolean(false); private long mostRecentActivationTime; @@ -139,41 +194,16 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } } - private boolean containsLeafQueue(String leafQueueName) { - return leafQueueStateMap.containsKey(leafQueueName); - } - - private boolean addLeafQueueStateIfNotExists(String leafQueueName, - LeafQueueState leafQueueState) { - if (!containsLeafQueue(leafQueueName)) { - leafQueueStateMap.put(leafQueueName, leafQueueState); - return true; - } - return false; - } - - private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) { - return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), - new LeafQueueState()); - } - - private void clearLeafQueueState() { - leafQueueStateMap.clear(); - } - private class ParentQueueState { private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel = new HashMap<String, Float>(); - private float getAbsoluteActivatedChildQueueCapacity() { - return getAbsoluteActivatedChildQueueCapacity(NO_LABEL); - } - private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { try { readLock.lock(); - Float totalActivatedCapacity = getByLabel(nodeLabel); + Float totalActivatedCapacity = getAbsActivatedChildQueueCapacityByLabel( + nodeLabel); if (totalActivatedCapacity != null) { return totalActivatedCapacity; } else{ @@ -188,11 +218,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy float childQueueCapacity) { try { writeLock.lock(); - Float activatedChildCapacity = getByLabel(nodeLabel); + Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel( + nodeLabel); if (activatedChildCapacity != null) { - setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel(nodeLabel, + activatedChildCapacity + childQueueCapacity); } else{ - setByLabel(nodeLabel, childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel(nodeLabel, + childQueueCapacity); } } finally { writeLock.unlock(); @@ -203,22 +236,25 @@ public class GuaranteedOrZeroCapacityOverTimePolicy float childQueueCapacity) { try { writeLock.lock(); - Float activatedChildCapacity = getByLabel(nodeLabel); + Float activatedChildCapacity = getAbsActivatedChildQueueCapacityByLabel( + nodeLabel); if (activatedChildCapacity != null) { - setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel(nodeLabel, + activatedChildCapacity - childQueueCapacity); } else{ - setByLabel(nodeLabel, childQueueCapacity); + setAbsActivatedChildQueueCapacityByLabel(nodeLabel, + childQueueCapacity); } } finally { writeLock.unlock(); } } - Float getByLabel(String label) { + Float getAbsActivatedChildQueueCapacityByLabel(String label) { return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label); } - Float setByLabel(String label, float val) { + Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) { return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val); } @@ -256,13 +292,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy @Override public void init(final CapacitySchedulerContext schedulerContext, - final ParentQueue parentQueue) { + final ParentQueue parentQueue) throws IOException { this.scheduler = schedulerContext; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); - if (!(parentQueue instanceof ManagedParentQueue)) { throw new IllegalArgumentException( "Expected instance of type " + ManagedParentQueue.class); @@ -278,15 +313,43 @@ public class GuaranteedOrZeroCapacityOverTimePolicy + leafQueueTemplate.getQueueCapacities() + "]"); } - private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) { + private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) + throws IOException { leafQueueTemplate = parentQueue.getLeafQueueTemplate(); leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities(); - ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f, - leafQueueTemplateCapacities.getMaximumCapacity()); + Set<String> parentQueueLabels = parentQueue.getNodeLabelsForQueue(); + for (String nodeLabel : leafQueueTemplateCapacities + .getExistingNodeLabels()) { + + if (!parentQueueLabels.contains(nodeLabel)) { + LOG.error("Invalid node label " + nodeLabel + + " on configured leaf template on parent" + " queue " + parentQueue + .getQueueName()); + throw new IOException("Invalid node label " + nodeLabel + + " on configured leaf template on parent" + " queue " + parentQueue + .getQueueName()); + } + } + + leafQueueTemplateNodeLabels = + leafQueueTemplateCapacities.getExistingNodeLabels(); + } + /** + * Compute/Adjust child queue capacities + * for auto created leaf queues + * This computes queue entitlements but does not update LeafQueueState or + * queue capacities. Scheduler calls commitQueueManagemetChanges after + * validation after applying queue changes and commits to LeafQueueState + * are done in commitQueueManagementChanges. + * + * @return List of Queue Management change suggestions which could potentially + * be committed/rejected by the scheduler due to validation failures + * @throws SchedulerDynamicEditException + */ @Override public List<QueueManagementChange> computeQueueManagementChanges() throws SchedulerDynamicEditException { @@ -298,70 +361,92 @@ public class GuaranteedOrZeroCapacityOverTimePolicy try { readLock.lock(); List<QueueManagementChange> queueManagementChanges = new ArrayList<>(); + List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications(); + + //Map of LeafQueue->QueueCapacities - keep adding the computed + // entitlements to this map and finally + // build the leaf queue configuration Template for all identified leaf + // queues + Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>(); + for (String nodeLabel : leafQueueTemplateNodeLabels) { + // check if any leaf queues need to be deactivated based on pending + // applications + float parentAbsoluteCapacity = + managedParentQueue.getQueueCapacities().getAbsoluteCapacity( + nodeLabel); + float leafQueueTemplateAbsoluteCapacity = + leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel); + Map<String, QueueCapacities> deactivatedLeafQueues = + deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel, + leafQueueEntitlements); + + float deactivatedCapacity = getTotalDeactivatedCapacity( + deactivatedLeafQueues, nodeLabel); + + float sumOfChildQueueActivatedCapacity = parentQueueState. + getAbsoluteActivatedChildQueueCapacity(nodeLabel); + + //Check if we need to activate anything at all? + float availableCapacity = + parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity + + deactivatedCapacity + EPSILON; - // check if any leaf queues need to be deactivated based on pending - // applications and - float parentAbsoluteCapacity = - managedParentQueue.getQueueCapacities().getAbsoluteCapacity(); - - float leafQueueTemplateAbsoluteCapacity = - leafQueueTemplateCapacities.getAbsoluteCapacity(); - Map<String, QueueCapacities> deactivatedLeafQueues = - deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges); - - float deactivatedCapacity = getTotalDeactivatedCapacity( - deactivatedLeafQueues); - - float sumOfChildQueueActivatedCapacity = parentQueueState. - getAbsoluteActivatedChildQueueCapacity(); - - //Check if we need to activate anything at all? - float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity, - deactivatedCapacity, sumOfChildQueueActivatedCapacity); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = " - + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = " - + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = " - + deactivatedCapacity + " , absChildActivatedCapacity = " - + sumOfChildQueueActivatedCapacity + ", availableCapacity = " - + availableCapacity); - } - - if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { - //sort applications across leaf queues by submit time - List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications(); + if (LOG.isDebugEnabled()) { + LOG.debug("Parent queue : " + managedParentQueue.getQueueName() + + ", nodeLabel = " + nodeLabel + ", absCapacity = " + + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = " + + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = " + + deactivatedCapacity + " , absChildActivatedCapacity = " + + sumOfChildQueueActivatedCapacity + ", availableCapacity = " + + availableCapacity); + } - if (pendingApps.size() > 0) { - int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( - availableCapacity, leafQueueTemplateAbsoluteCapacity, - pendingApps.size()); + if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) { + //sort applications across leaf queues by submit time + if (pendingApps.size() > 0) { + int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated( + availableCapacity, leafQueueTemplateAbsoluteCapacity, + pendingApps.size()); - if (LOG.isDebugEnabled()) { - LOG.debug("Found " + maxLeafQueuesTobeActivated - + " leaf queues to be activated with " + pendingApps.size() - + " apps "); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + maxLeafQueuesTobeActivated + " leaf queues" + + " to be activated with " + pendingApps.size() + " apps "); + } - LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues( - pendingApps, maxLeafQueuesTobeActivated, - deactivatedLeafQueues.keySet()); + LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues( + nodeLabel, pendingApps, maxLeafQueuesTobeActivated, + deactivatedLeafQueues.keySet()); - //Compute entitlement changes for the identified leaf queues - // which is appended to the List of queueManagementChanges - computeQueueManagementChanges(leafQueuesToBeActivated, - queueManagementChanges, availableCapacity, - leafQueueTemplateAbsoluteCapacity); + //Compute entitlement changes for the identified leaf queues + // which is appended to the List of computedEntitlements + updateLeafQueueCapacitiesByLabel(nodeLabel, leafQueuesToBeActivated, + leafQueueEntitlements); - if (LOG.isDebugEnabled()) { - if (leafQueuesToBeActivated.size() > 0) { - LOG.debug( - "Activated leaf queues : [" + leafQueuesToBeActivated + "]"); + if (LOG.isDebugEnabled()) { + if (leafQueuesToBeActivated.size() > 0) { + LOG.debug("Activated leaf queues : [" + leafQueuesToBeActivated + + "]"); + } } } } } + + //Populate new entitlements + + for (final Iterator<Map.Entry<String, QueueCapacities>> iterator = + leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry<String, QueueCapacities> queueCapacities = iterator.next(); + String leafQueueName = queueCapacities.getKey(); + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() + .getQueue(leafQueueName); + AutoCreatedLeafQueueConfig newTemplate = buildTemplate( + queueCapacities.getValue()); + queueManagementChanges.add( + new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); + + } return queueManagementChanges; } finally { readLock.unlock(); @@ -369,14 +454,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } private float getTotalDeactivatedCapacity( - Map<String, QueueCapacities> deactivatedLeafQueues) { + Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) { float deactivatedCapacity = 0; for (Iterator<Map.Entry<String, QueueCapacities>> iterator = deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<String, QueueCapacities> deactivatedQueueCapacity = iterator.next(); deactivatedCapacity += - deactivatedQueueCapacity.getValue().getAbsoluteCapacity(); + deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel); } return deactivatedCapacity; } @@ -385,20 +470,42 @@ public class GuaranteedOrZeroCapacityOverTimePolicy void updateLeafQueueState() { try { writeLock.lock(); + Set<String> newPartitions = new HashSet<>(); Set<String> newQueues = new HashSet<>(); + for (CSQueue newQueue : managedParentQueue.getChildQueues()) { if (newQueue instanceof LeafQueue) { - addLeafQueueStateIfNotExists((LeafQueue) newQueue); + for (String nodeLabel : leafQueueTemplateNodeLabels) { + leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue, + nodeLabel); + newPartitions.add(nodeLabel); + } newQueues.add(newQueue.getQueueName()); } } - for (Iterator<Map.Entry<String, LeafQueueState>> itr = - leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) { - Map.Entry<String, LeafQueueState> e = itr.next(); - String queueName = e.getKey(); - if (!newQueues.contains(queueName)) { + for (Iterator<Map.Entry<String, Map<String, LeafQueueStatePerPartition>>> + itr = leafQueueState.getLeafQueueStateMap().entrySet().iterator(); + itr.hasNext(); ) { + Map.Entry<String, Map<String, LeafQueueStatePerPartition>> e = + itr.next(); + String partition = e.getKey(); + if (!newPartitions.contains(partition)) { itr.remove(); + LOG.info( + "Removed partition " + partition + " from leaf queue " + "state"); + } else{ + Map<String, LeafQueueStatePerPartition> queues = e.getValue(); + for ( + Iterator<Map.Entry<String, LeafQueueStatePerPartition>> queueItr = + queues.entrySet().iterator(); queueItr.hasNext(); ) { + String queue = queueItr.next().getKey(); + if (!newQueues.contains(queue)) { + queueItr.remove(); + LOG.info("Removed queue " + queue + " from leaf queue " + + "state from partition " + partition); + } + } } } } finally { @@ -406,22 +513,20 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } } - private LinkedHashSet<String> getSortedLeafQueues( + private LinkedHashSet<String> getSortedLeafQueues(String nodeLabel, final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded, Set<String> deactivatedQueues) throws SchedulerDynamicEditException { LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded); int ctr = 0; for (FiCaSchedulerApp app : pendingApps) { - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) app.getCSLeafQueue(); String leafQueueName = leafQueue.getQueueName(); //Check if leafQueue is not active already and has any pending apps if (ctr < leafQueuesNeeded) { - - if (!isActive(leafQueue)) { + if (!isActive(leafQueue, nodeLabel)) { if (!deactivatedQueues.contains(leafQueueName)) { if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) { ctr++; @@ -445,11 +550,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } @VisibleForTesting - public boolean isActive(final AutoCreatedLeafQueue leafQueue) - throws SchedulerDynamicEditException { + public boolean isActive(final AutoCreatedLeafQueue leafQueue, + String nodeLabel) throws SchedulerDynamicEditException { try { readLock.lock(); - LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue); + LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue, + nodeLabel); return leafQueueStatus.isActive(); } finally { readLock.unlock(); @@ -457,64 +563,52 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive( - ParentQueue parentQueue, - List<QueueManagementChange> queueManagementChanges) + ParentQueue parentQueue, String nodeLabel, + Map<String, QueueCapacities> leafQueueEntitlements) throws SchedulerDynamicEditException { Map<String, QueueCapacities> deactivatedQueues = new HashMap<>(); for (CSQueue childQueue : parentQueue.getChildQueues()) { AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + if (leafQueue != null) { + if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) { + if (!leafQueueEntitlements.containsKey(leafQueue.getQueueName())) { + leafQueueEntitlements.put(leafQueue.getQueueName(), + new QueueCapacities(false)); + } - if (isActive(leafQueue) && !hasPendingApps(leafQueue)) { - queueManagementChanges.add( - new QueueManagementChange.UpdateQueue(leafQueue, - ZERO_CAPACITY_ENTITLEMENT)); - deactivatedQueues.put(leafQueue.getQueueName(), - leafQueueTemplateCapacities); - } else{ - if (LOG.isDebugEnabled()) { - LOG.debug(" Leaf queue has pending applications : " + leafQueue - .getNumApplications() + ".Skipping deactivation for " - + leafQueue); + QueueCapacities capacities = leafQueueEntitlements.get( + leafQueue.getQueueName()); + updateToZeroCapacity(capacities, nodeLabel); + deactivatedQueues.put(leafQueue.getQueueName(), + leafQueueTemplateCapacities); + } else{ + if (LOG.isDebugEnabled()) { + LOG.debug(" Leaf queue has pending applications or is " + "inactive" + + " : " + leafQueue.getNumApplications() + + ".Skipping deactivation for " + leafQueue); + } } + } else{ + LOG.warn("Could not find queue in scheduler while trying" + " to " + + "deactivate for " + parentQueue); } } - if (LOG.isDebugEnabled()) { - if (deactivatedQueues.size() > 0) { - LOG.debug("Deactivated leaf queues : " + deactivatedQueues); - } - } return deactivatedQueues; } - private void computeQueueManagementChanges( + private void updateLeafQueueCapacitiesByLabel(String nodeLabel, Set<String> leafQueuesToBeActivated, - List<QueueManagementChange> queueManagementChanges, - final float availableCapacity, - final float leafQueueTemplateAbsoluteCapacity) { - - float curAvailableCapacity = availableCapacity; - + Map<String, QueueCapacities> leafQueueEntitlements) { for (String curLeafQueue : leafQueuesToBeActivated) { - // Activate queues if capacity is available - if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) { - AutoCreatedLeafQueue leafQueue = - (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager() - .getQueue(curLeafQueue); - if (leafQueue != null) { - AutoCreatedLeafQueueConfig newTemplate = buildTemplate( - leafQueueTemplateCapacities.getCapacity(), - leafQueueTemplateCapacities.getMaximumCapacity()); - queueManagementChanges.add( - new QueueManagementChange.UpdateQueue(leafQueue, newTemplate)); - curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity; - } else{ - LOG.warn( - "Could not find queue in scheduler while trying to deactivate " - + curLeafQueue); - } + if (!leafQueueEntitlements.containsKey(curLeafQueue)) { + leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false)); + // Activate queues if capacity is available } + + QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue); + updateCapacityFromTemplate(capacities, nodeLabel); } } @@ -528,17 +622,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy availableCapacity / childQueueAbsoluteCapacity); return Math.min(numLeafQueuesNeeded, numPendingApps); - } else{ - throw new SchedulerDynamicEditException("Child queue absolute capacity " - + "is initialized to 0. Check parent queue's " + managedParentQueue - .getQueueName() + " leaf queue template configuration"); } - } - - private float getAvailableCapacity(float parentAbsCapacity, - float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) { - return parentAbsCapacity - totalChildQueueActivatedCapacity - + deactivatedAbsCapacity + EPSILON; + return 0; } /** @@ -567,25 +652,27 @@ public class GuaranteedOrZeroCapacityOverTimePolicy AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue; - if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) { - if (isActive(leafQueue)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Queue is already active. Skipping activation : " + queue - .getQueuePath()); + for (String nodeLabel : updatedQueueTemplate.getQueueCapacities() + .getExistingNodeLabels()) { + if (updatedQueueTemplate.getQueueCapacities(). + getCapacity(nodeLabel) > 0) { + if (isActive(leafQueue, nodeLabel)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue is already active." + " Skipping activation : " + + queue.getQueuePath()); + } + } else{ + activate(leafQueue, nodeLabel); } } else{ - activate(leafQueue); - } - } else{ - if (!isActive(leafQueue)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Queue is already de-activated. " + "Skipping de-activation " - + ": " + leafQueue.getQueuePath()); + if (!isActive(leafQueue, nodeLabel)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue is already de-activated. Skipping " + + "de-activation : " + leafQueue.getQueuePath()); + } + } else{ + deactivate(leafQueue, nodeLabel); } - } else{ - deactivate(leafQueue); } } } @@ -594,30 +681,26 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } } - private void activate(final AutoCreatedLeafQueue leafQueue) - throws SchedulerDynamicEditException { + private void activate(final AbstractAutoCreatedLeafQueue leafQueue, + String nodeLabel) throws SchedulerDynamicEditException { try { writeLock.lock(); - getLeafQueueState(leafQueue).activate(); - - parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL, - leafQueueTemplateCapacities.getAbsoluteCapacity()); + getLeafQueueState(leafQueue, nodeLabel).activate(); + parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel)); } finally { writeLock.unlock(); } } - private void deactivate(final AutoCreatedLeafQueue leafQueue) - throws SchedulerDynamicEditException { + private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue, + String nodeLabel) throws SchedulerDynamicEditException { try { writeLock.lock(); - getLeafQueueState(leafQueue).deactivate(); + getLeafQueueState(leafQueue, nodeLabel).deactivate(); - for (String nodeLabel : managedParentQueue.getQueueCapacities() - .getExistingNodeLabels()) { - parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, - leafQueueTemplateCapacities.getAbsoluteCapacity()); - } + parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel, + leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel)); } finally { writeLock.unlock(); } @@ -629,7 +712,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy @Override public void reinitialize(CapacitySchedulerContext schedulerContext, - final ParentQueue parentQueue) { + final ParentQueue parentQueue) throws IOException { if (!(parentQueue instanceof ManagedParentQueue)) { throw new IllegalStateException( "Expected instance of type " + ManagedParentQueue.class + " found " @@ -649,12 +732,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy //clear state parentQueueState.clear(); - clearLeafQueueState(); + leafQueueState.clear(); LOG.info( - "Reinitialized queue management policy for parent queue " - + parentQueue.getQueueName() +" with leaf queue template " - + "capacities : [" + "Reinitialized queue management policy for parent queue " + parentQueue + .getQueueName() + " with leaf queue template " + "capacities : [" + leafQueueTemplate.getQueueCapacities() + "]"); } @@ -663,51 +745,74 @@ public class GuaranteedOrZeroCapacityOverTimePolicy AbstractAutoCreatedLeafQueue leafQueue) throws SchedulerDynamicEditException { - if ( !(leafQueue instanceof AutoCreatedLeafQueue)) { - throw new SchedulerDynamicEditException("Not an instance of " - + "AutoCreatedLeafQueue : " + leafQueue.getClass()); + AutoCreatedLeafQueueConfig template; + + if (!(leafQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue + .getClass()); } - AutoCreatedLeafQueue autoCreatedLeafQueue = - (AutoCreatedLeafQueue) leafQueue; - AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT; try { writeLock.lock(); - if (!addLeafQueueStateIfNotExists(leafQueue)) { - LOG.error("Leaf queue already exists in state : " + getLeafQueueState( - leafQueue)); - throw new SchedulerDynamicEditException( - "Leaf queue already exists in state : " + getLeafQueueState( - leafQueue)); - } - float availableCapacity = getAvailableCapacity( - managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0, - parentQueueState.getAbsoluteActivatedChildQueueCapacity()); + QueueCapacities capacities = new QueueCapacities(false); + for (String nodeLabel : leafQueueTemplateNodeLabels) { + if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue, + nodeLabel)) { + String message = + "Leaf queue already exists in state : " + getLeafQueueState( + leafQueue, nodeLabel); + LOG.error(message); + } - if (availableCapacity >= leafQueueTemplateCapacities - .getAbsoluteCapacity()) { - activate(autoCreatedLeafQueue); - template = buildTemplate(leafQueueTemplateCapacities.getCapacity(), - leafQueueTemplateCapacities.getMaximumCapacity()); + float availableCapacity = managedParentQueue.getQueueCapacities(). + getAbsoluteCapacity(nodeLabel) - parentQueueState. + getAbsoluteActivatedChildQueueCapacity(nodeLabel) + EPSILON; + + if (availableCapacity >= leafQueueTemplateCapacities + .getAbsoluteCapacity(nodeLabel)) { + updateCapacityFromTemplate(capacities, nodeLabel); + activate(leafQueue, nodeLabel); + } else{ + updateToZeroCapacity(capacities, nodeLabel); + } } + + template = buildTemplate(capacities); } finally { writeLock.unlock(); } return template; } + private void updateToZeroCapacity(QueueCapacities capacities, + String nodeLabel) { + capacities.setCapacity(nodeLabel, 0.0f); + capacities.setMaximumCapacity(nodeLabel, + leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel)); + } + + private void updateCapacityFromTemplate(QueueCapacities capacities, + String nodeLabel) { + capacities.setCapacity(nodeLabel, + leafQueueTemplateCapacities.getCapacity(nodeLabel)); + capacities.setMaximumCapacity(nodeLabel, + leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel)); + } + @VisibleForTesting - LeafQueueState getLeafQueueState(LeafQueue queue) - throws SchedulerDynamicEditException { + LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue, + String partition) throws SchedulerDynamicEditException { try { readLock.lock(); String queueName = queue.getQueueName(); - if (!containsLeafQueue(queueName)) { + if (!leafQueueState.containsLeafQueue(queueName, partition)) { throw new SchedulerDynamicEditException( "Could not find leaf queue in " + "state " + queueName); } else{ - return leafQueueStateMap.get(queueName); + return leafQueueState. + getLeafQueueStatePerPartition(queueName, partition); } } finally { readLock.unlock(); @@ -715,8 +820,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy } @VisibleForTesting - public float getAbsoluteActivatedChildQueueCapacity() { - return parentQueueState.getAbsoluteActivatedChildQueueCapacity(); + public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { + return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel); } private List<FiCaSchedulerApp> getSortedPendingApplications() { @@ -726,20 +831,10 @@ public class GuaranteedOrZeroCapacityOverTimePolicy return apps; } - private AutoCreatedLeafQueueConfig buildTemplate(float capacity, - float maxCapacity) { + private AutoCreatedLeafQueueConfig buildTemplate(QueueCapacities capacities) { AutoCreatedLeafQueueConfig.Builder templateBuilder = new AutoCreatedLeafQueueConfig.Builder(); - - QueueCapacities capacities = new QueueCapacities(false); templateBuilder.capacities(capacities); - - for (String nodeLabel : managedParentQueue.getQueueCapacities() - .getExistingNodeLabels()) { - capacities.setCapacity(nodeLabel, capacity); - capacities.setMaximumCapacity(nodeLabel, maxCapacity); - } - return new AutoCreatedLeafQueueConfig(templateBuilder); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java index 8765e86..8702c03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java @@ -62,4 +62,12 @@ public class PendingAskUpdateResult { public String getNewNodePartition() { return newNodePartition; } + + @Override + public String toString() { + return "PendingAskUpdateResult{" + "lastPendingAsk=" + lastPendingAsk + + ", lastNodePartition='" + lastNodePartition + '\'' + + ", newPendingAsk=" + newPendingAsk + ", newNodePartition='" + + newNodePartition + '\'' + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 0a06e82..2e28395 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -65,6 +67,7 @@ public class MockNM { new HashMap<ContainerId, ContainerStatus>(); private Map<ApplicationId, AppCollectorData> registeringCollectors = new ConcurrentHashMap<>(); + private Set<NodeLabel> nodeLabels; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -101,6 +104,13 @@ public class MockNM { nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); } + public MockNM(String nodeIdStr, Resource capability, + ResourceTrackerService resourceTracker, String version, Set<NodeLabel> + nodeLabels) { + this(nodeIdStr, capability, resourceTracker, version); + this.nodeLabels = nodeLabels; + } + public NodeId getNodeId() { return nodeId; } @@ -164,12 +174,17 @@ public class MockNM { List<ApplicationId> runningApplications) throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); + req.setNodeId(nodeId); req.setHttpPort(httpPort); req.setResource(capability); req.setContainerStatuses(containerReports); req.setNMVersion(version); req.setRunningApplications(runningApplications); + if ( nodeLabels != null && nodeLabels.size() > 0) { + req.setNodeLabels(nodeLabels); + } + RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 1a1b527..f6cdfec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -36,6 +36,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -247,10 +249,11 @@ public class TestAppManager{ private TestRMAppManager appMonitor; private ApplicationSubmissionContext asContext; private ApplicationId appId; + private QueueInfo mockDefaultQueueInfo; @SuppressWarnings("deprecation") @Before - public void setUp() { + public void setUp() throws IOException { long now = System.currentTimeMillis(); rmContext = mockRMContext(1, now - 10); @@ -258,6 +261,7 @@ public class TestAppManager{ .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class)); ResourceScheduler scheduler = mockResourceScheduler(); ((RMContextImpl)rmContext).setScheduler(scheduler); + Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); ((RMContextImpl) rmContext).setYarnConfiguration(conf); @@ -275,6 +279,11 @@ public class TestAppManager{ asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory)); asContext.setResource(mockResource()); asContext.setPriority(Priority.newInstance(0)); + asContext.setQueue("default"); + mockDefaultQueueInfo = mock(QueueInfo.class); + when(scheduler.getQueueInfo("default", false, false)) + .thenReturn(mockDefaultQueueInfo); + setupDispatcher(rmContext, conf); } @@ -709,6 +718,7 @@ public class TestAppManager{ for (ResourceRequest req : reqs) { req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); } + // setAMContainerResourceRequests has priority over // setAMContainerResourceRequest and setResource Assert.assertEquals(reqs, app.getAMResourceRequests()); @@ -722,6 +732,7 @@ public class TestAppManager{ ResourceRequest req = ResourceRequest.newInstance(Priority.newInstance(0), ResourceRequest.ANY, Resources.createResource(1025), 1, true); + req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); asContext.setAMContainerResourceRequest(cloneResourceRequest(req)); // getAMContainerResourceRequests uses a singleton list of // getAMContainerResourceRequest @@ -729,7 +740,6 @@ public class TestAppManager{ Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0)); Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size()); RMApp app = testRMAppSubmit(); - req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); // setAMContainerResourceRequest has priority over setResource Assert.assertEquals(Collections.singletonList(req), app.getAMResourceRequests()); @@ -740,10 +750,12 @@ public class TestAppManager{ asContext.setResource(Resources.createResource(1024)); asContext.setAMContainerResourceRequests(null); RMApp app = testRMAppSubmit(); + // setResource Assert.assertEquals(Collections.singletonList( ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY, - ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")), + ResourceRequest.ANY, Resources.createResource(1024), 1, true, + "")), app.getAMResourceRequests()); } @@ -766,6 +778,8 @@ public class TestAppManager{ throws Exception { asContext.setResource(null); List<ResourceRequest> reqs = new ArrayList<>(); + when(mockDefaultQueueInfo.getAccessibleNodeLabels()).thenReturn + (new HashSet<String>() {{ add("label1"); add(""); }}); ResourceRequest anyReq = ResourceRequest.newInstance( Priority.newInstance(1), ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1", --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org