http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index b43f658..15d7c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -60,10 +59,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; @@ -730,17 +729,22 @@ public class LeafQueue extends AbstractCSQueue { } private void handleExcessReservedContainer(Resource clusterResource, - CSAssignment assignment) { + CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) { if (assignment.getExcessReservation() != null) { RMContainer excessReservedContainer = assignment.getExcessReservation(); - - completedContainer(clusterResource, assignment.getApplication(), - scheduler.getNode(excessReservedContainer.getAllocatedNode()), - excessReservedContainer, - SchedulerUtils.createAbnormalContainerStatus( - excessReservedContainer.getContainerId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null, false); + + if (excessReservedContainer.hasIncreaseReservation()) { + unreserveIncreasedContainer(clusterResource, + app, node, excessReservedContainer); + } else { + completedContainer(clusterResource, assignment.getApplication(), + scheduler.getNode(excessReservedContainer.getAllocatedNode()), + excessReservedContainer, + SchedulerUtils.createAbnormalContainerStatus( + excessReservedContainer.getContainerId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null, false); + } assignment.setExcessReservation(null); } @@ -766,7 +770,8 @@ public class LeafQueue extends AbstractCSQueue { CSAssignment assignment = application.assignContainers(clusterResource, node, currentResourceLimits, schedulingMode, reservedContainer); - handleExcessReservedContainer(clusterResource, assignment); + handleExcessReservedContainer(clusterResource, assignment, node, + application); return assignment; } } @@ -824,7 +829,8 @@ public class LeafQueue extends AbstractCSQueue { // Did we schedule or reserve a container? Resource assigned = assignment.getResource(); - handleExcessReservedContainer(clusterResource, assignment); + handleExcessReservedContainer(clusterResource, assignment, node, + application); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) { @@ -836,7 +842,8 @@ public class LeafQueue extends AbstractCSQueue { // Book-keeping // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer); + node.getPartition(), reservedOrAllocatedRMContainer, + assignment.isIncreasedAllocation()); // Done return assignment; @@ -1086,6 +1093,37 @@ public class LeafQueue extends AbstractCSQueue { } return true; } + + @Override + public void unreserveIncreasedContainer(Resource clusterResource, + FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { + boolean removed = false; + Priority priority = null; + + synchronized (this) { + if (rmContainer.getContainer() != null) { + priority = rmContainer.getContainer().getPriority(); + } + + if (null != priority) { + removed = app.unreserve(rmContainer.getContainer().getPriority(), node, + rmContainer); + } + + if (removed) { + // Inform the ordering policy + orderingPolicy.containerReleased(app, rmContainer); + + releaseResource(clusterResource, app, rmContainer.getReservedResource(), + node.getPartition(), rmContainer, true); + } + } + + if (removed) { + getParent().unreserveIncreasedContainer(clusterResource, app, node, + rmContainer); + } + } @Override public void completedContainer(Resource clusterResource, @@ -1093,6 +1131,15 @@ public class LeafQueue extends AbstractCSQueue { ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue, boolean sortQueues) { if (application != null) { + // unreserve container increase request if it previously reserved. + if (rmContainer.hasIncreaseReservation()) { + unreserveIncreasedContainer(clusterResource, application, node, + rmContainer); + } + + // Remove container increase request if it exists + application.removeIncreaseRequest(node.getNodeID(), + rmContainer.getAllocatedPriority(), rmContainer.getContainerId()); boolean removed = false; @@ -1123,7 +1170,7 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.containerReleased(application, rmContainer); releaseResource(clusterResource, application, container.getResource(), - node.getPartition(), rmContainer); + node.getPartition(), rmContainer, false); } } @@ -1137,8 +1184,10 @@ public class LeafQueue extends AbstractCSQueue { synchronized void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, - String nodePartition, RMContainer rmContainer) { - super.allocateResource(clusterResource, resource, nodePartition); + String nodePartition, RMContainer rmContainer, + boolean isIncreasedAllocation) { + super.allocateResource(clusterResource, resource, nodePartition, + isIncreasedAllocation); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1174,8 +1223,9 @@ public class LeafQueue extends AbstractCSQueue { synchronized void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, - RMContainer rmContainer) { - super.releaseResource(clusterResource, resource, nodePartition); + RMContainer rmContainer, boolean isChangeResource) { + super.releaseResource(clusterResource, resource, nodePartition, + isChangeResource); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1363,7 +1413,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer); + .getResource(), node.getPartition(), rmContainer, false); } getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1412,7 +1462,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer); + .getResource(), node.getPartition(), rmContainer, false); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() @@ -1430,7 +1480,7 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer); + .getResource(), node.getPartition(), rmContainer, false); LOG.info("movedContainer" + " container=" + rmContainer.getContainer() + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() @@ -1482,6 +1532,39 @@ public class LeafQueue extends AbstractCSQueue { public Priority getDefaultApplicationPriority() { return defaultAppPriorityPerQueue; } + + @Override + public void decreaseContainer(Resource clusterResource, + SchedContainerChangeRequest decreaseRequest, + FiCaSchedulerApp app) { + // If the container being decreased is reserved, we need to unreserve it + // first. + RMContainer rmContainer = decreaseRequest.getRMContainer(); + if (rmContainer.hasIncreaseReservation()) { + unreserveIncreasedContainer(clusterResource, app, + (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer); + } + + // Delta capacity is negative when it's a decrease request + Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); + + synchronized (this) { + // Delta is negative when it's a decrease request + releaseResource(clusterResource, app, absDelta, + decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(), + true); + // Notify application + app.decreaseContainer(decreaseRequest); + // Notify node + decreaseRequest.getSchedulerNode() + .decreaseContainer(decreaseRequest.getContainerId(), absDelta); + } + + // Notify parent + if (getParent() != null) { + getParent().decreaseContainer(clusterResource, decreaseRequest, app); + } + } public synchronized OrderingPolicy<FiCaSchedulerApp> getPendingAppsOrderingPolicy() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index e01204c..badab72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -430,7 +431,7 @@ public class ParentQueue extends AbstractCSQueue { assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue super.allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition()); + node.getPartition(), assignedToChild.isIncreasedAllocation()); // Track resource utilization in this pass of the scheduler Resources @@ -454,6 +455,8 @@ public class ParentQueue extends AbstractCSQueue { .addAll( assignedToChild.getAssignmentInformation() .getReservationDetails()); + assignment.setIncreasedAllocation(assignedToChild + .isIncreasedAllocation()); LOG.info("assignedContainer" + " queue=" + getQueueName() + @@ -616,6 +619,73 @@ public class ParentQueue extends AbstractCSQueue { } } + private synchronized void internalReleaseResource(Resource clusterResource, + FiCaSchedulerNode node, Resource releasedResource, boolean changeResource, + CSQueue completedChildQueue, boolean sortQueues) { + super.releaseResource(clusterResource, + releasedResource, node.getPartition(), + changeResource); + + if (LOG.isDebugEnabled()) { + LOG.debug("completedContainer " + this + ", cluster=" + clusterResource); + } + + // Note that this is using an iterator on the childQueues so this can't + // be called if already within an iterator for the childQueues. Like + // from assignContainersToChildQueues. + if (sortQueues) { + // reinsert the updated queue + for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) { + CSQueue csqueue = iter.next(); + if (csqueue.equals(completedChildQueue)) { + iter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-sorting completed queue: " + csqueue); + } + childQueues.add(csqueue); + break; + } + } + } + + // If we skipped sort queue this time, we need to resort queues to make + // sure we allocate from least usage (or order defined by queue policy) + // queues. + needToResortQueuesAtNextAllocation = !sortQueues; + } + + @Override + public void decreaseContainer(Resource clusterResource, + SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) { + // delta capacity is negative when it's a decrease request + Resource absDeltaCapacity = + Resources.negate(decreaseRequest.getDeltaCapacity()); + + internalReleaseResource(clusterResource, + csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false, + null, false); + + // Inform the parent + if (parent != null) { + parent.decreaseContainer(clusterResource, decreaseRequest, app); + } + } + + @Override + public void unreserveIncreasedContainer(Resource clusterResource, + FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { + if (app != null) { + internalReleaseResource(clusterResource, node, + rmContainer.getReservedResource(), false, null, false); + + // Inform the parent + if (parent != null) { + parent.unreserveIncreasedContainer(clusterResource, app, node, + rmContainer); + } + } + } + @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, @@ -623,40 +693,9 @@ public class ParentQueue extends AbstractCSQueue { RMContainerEventType event, CSQueue completedChildQueue, boolean sortQueues) { if (application != null) { - // Careful! Locking order is important! - // Book keeping - synchronized (this) { - super.releaseResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition()); - - if (LOG.isDebugEnabled()) { - LOG.debug("completedContainer " + this + ", cluster=" + clusterResource); - } - - // Note that this is using an iterator on the childQueues so this can't - // be called if already within an iterator for the childQueues. Like - // from assignContainersToChildQueues. - if (sortQueues) { - // reinsert the updated queue - for (Iterator<CSQueue> iter = childQueues.iterator(); - iter.hasNext();) { - CSQueue csqueue = iter.next(); - if(csqueue.equals(completedChildQueue)) { - iter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-sorting completed queue: " + csqueue); - } - childQueues.add(csqueue); - break; - } - } - } - - // If we skipped sort queue this time, we need to resort queues to make - // sure we allocate from least usage (or order defined by queue policy) - // queues. - needToResortQueuesAtNextAllocation = !sortQueues; - } + internalReleaseResource(clusterResource, node, + rmContainer.getContainer().getResource(), false, completedChildQueue, + sortQueues); // Inform the parent if (parent != null) { @@ -698,7 +737,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), false); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -726,7 +765,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getPartition()); + .getResource(), node.getPartition(), false); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" @@ -746,7 +785,7 @@ public class ParentQueue extends AbstractCSQueue { scheduler.getNode(rmContainer.getContainer().getNodeId()); super.releaseResource(clusterResource, rmContainer.getContainer().getResource(), - node.getPartition()); + node.getPartition(), false); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java new file mode 100644 index 0000000..b986b1f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * For an application, resource limits and resource requests, decide how to + * allocate container. This is to make application resource allocation logic + * extensible. + */ +public abstract class AbstractContainerAllocator { + private static final Log LOG = LogFactory.getLog(AbstractContainerAllocator.class); + + FiCaSchedulerApp application; + final ResourceCalculator rc; + final RMContext rmContext; + + public AbstractContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + this.application = application; + this.rc = rc; + this.rmContext = rmContext; + } + + protected CSAssignment getCSAssignmentFromAllocateResult( + Resource clusterResource, ContainerAllocation result, + RMContainer rmContainer) { + // Handle skipped + boolean skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment assignment = new CSAssignment(skipped); + assignment.setApplication(application); + + // Handle excess reservation + assignment.setExcessReservation(result.getContainerToBeUnreserved()); + + // If we allocated something + if (Resources.greaterThan(rc, clusterResource, + result.getResourceToBeAllocated(), Resources.none())) { + Resource allocatedResource = result.getResourceToBeAllocated(); + Container updatedContainer = result.getUpdatedContainer(); + + assignment.setResource(allocatedResource); + assignment.setType(result.getContainerNodeType()); + + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + + application.getApplicationId() + " resource=" + allocatedResource + + " queue=" + this.toString() + " cluster=" + clusterResource); + assignment.getAssignmentInformation().addReservationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrReservations(); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + allocatedResource); + } else if (result.getAllocationState() == AllocationState.ALLOCATED){ + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + application.getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + " clusterResource=" + + clusterResource); + + application + .getCSLeafQueue() + .getOrderingPolicy() + .containerAllocated(application, + application.getRMContainer(updatedContainer.getId())); + + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); + + if (rmContainer != null) { + assignment.setFulfilledReservation(true); + } + } + } + + return assignment; + } + + /** + * allocate needs to handle following stuffs: + * + * <ul> + * <li>Select request: Select a request to allocate. E.g. select a resource + * request based on requirement/priority/locality.</li> + * <li>Check if a given resource can be allocated based on resource + * availability</li> + * <li>Do allocation: this will decide/create allocated/reserved + * container, this will also update metrics</li> + * </ul> + */ + public abstract CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 6e296cd..3be8e0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -18,13 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -33,118 +30,50 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -/** - * For an application, resource limits and resource requests, decide how to - * allocate container. This is to make application resource allocation logic - * extensible. - */ -public abstract class ContainerAllocator { - private static final Log LOG = LogFactory.getLog(ContainerAllocator.class); +public class ContainerAllocator extends AbstractContainerAllocator { + AbstractContainerAllocator increaseContainerAllocator; + AbstractContainerAllocator regularContainerAllocator; - FiCaSchedulerApp application; - final ResourceCalculator rc; - final RMContext rmContext; - public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { - this.application = application; - this.rc = rc; - this.rmContext = rmContext; - } + super(application, rc, rmContext); - protected boolean checkHeadroom(Resource clusterResource, - ResourceLimits currentResourceLimits, Resource required, - FiCaSchedulerNode node) { - // If headroom + currentReservation < required, we cannot allocate this - // require - Resource resourceCouldBeUnReserved = application.getCurrentReservation(); - if (!application.getCSLeafQueue().getReservationContinueLooking() - || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { - // If we don't allow reservation continuous looking, OR we're looking at - // non-default node partition, we won't allow to unreserve before - // allocation. - resourceCouldBeUnReserved = Resources.none(); - } - return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( - currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), - required); + increaseContainerAllocator = + new IncreaseContainerAllocator(application, rc, rmContext); + regularContainerAllocator = + new RegularContainerAllocator(application, rc, rmContext); } - protected CSAssignment getCSAssignmentFromAllocateResult( - Resource clusterResource, ContainerAllocation result, - RMContainer rmContainer) { - // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); - CSAssignment assignment = new CSAssignment(skipped); - assignment.setApplication(application); - - // Handle excess reservation - assignment.setExcessReservation(result.getContainerToBeUnreserved()); - - // If we allocated something - if (Resources.greaterThan(rc, clusterResource, - result.getResourceToBeAllocated(), Resources.none())) { - Resource allocatedResource = result.getResourceToBeAllocated(); - Container updatedContainer = result.getUpdatedContainer(); - - assignment.setResource(allocatedResource); - assignment.setType(result.getContainerNodeType()); - - if (result.getAllocationState() == AllocationState.RESERVED) { - // This is a reserved container - LOG.info("Reserved container " + " application=" - + application.getApplicationId() + " resource=" + allocatedResource - + " queue=" + this.toString() + " cluster=" + clusterResource); - assignment.getAssignmentInformation().addReservationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrReservations(); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - allocatedResource); - } else if (result.getAllocationState() == AllocationState.ALLOCATED){ - // This is a new container - // Inform the ordering policy - LOG.info("assignedContainer" + " application attempt=" - + application.getApplicationAttemptId() + " container=" - + updatedContainer.getId() + " queue=" + this + " clusterResource=" - + clusterResource); - - application - .getCSLeafQueue() - .getOrderingPolicy() - .containerAllocated(application, - application.getRMContainer(updatedContainer.getId())); - - assignment.getAssignmentInformation().addAllocationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - allocatedResource); - - if (rmContainer != null) { - assignment.setFulfilledReservation(true); - } + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer) { + if (reservedContainer != null) { + if (reservedContainer.getState() == RMContainerState.RESERVED) { + // It's a regular container + return regularContainerAllocator.assignContainers(clusterResource, + node, schedulingMode, resourceLimits, reservedContainer); + } else { + // It's a increase container + return increaseContainerAllocator.assignContainers(clusterResource, + node, schedulingMode, resourceLimits, reservedContainer); + } + } else { + /* + * Try to allocate increase container first, and if we failed to allocate + * anything, we will try to allocate regular container + */ + CSAssignment assign = + increaseContainerAllocator.assignContainers(clusterResource, node, + schedulingMode, resourceLimits, null); + if (Resources.greaterThan(rc, clusterResource, assign.getResource(), + Resources.none())) { + return assign; } + + return regularContainerAllocator.assignContainers(clusterResource, node, + schedulingMode, resourceLimits, null); } - - return assignment; } - - /** - * allocate needs to handle following stuffs: - * - * <ul> - * <li>Select request: Select a request to allocate. E.g. select a resource - * request based on requirement/priority/locality.</li> - * <li>Check if a given resource can be allocated based on resource - * availability</li> - * <li>Do allocation: this will decide/create allocated/reserved - * container, this will also update metrics</li> - * </ul> - */ - public abstract CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, RMContainer reservedContainer); -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java new file mode 100644 index 0000000..9350adc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +public class IncreaseContainerAllocator extends AbstractContainerAllocator { + private static final Log LOG = + LogFactory.getLog(IncreaseContainerAllocator.class); + + public IncreaseContainerAllocator(FiCaSchedulerApp application, + ResourceCalculator rc, RMContext rmContext) { + super(application, rc, rmContext); + } + + /** + * Quick check if we can allocate anything here: + * We will not continue if: + * - Headroom doesn't support allocate minimumAllocation + * - + */ + private boolean checkHeadroom(Resource clusterResource, + ResourceLimits currentResourceLimits, Resource required) { + return Resources.greaterThanOrEqual(rc, clusterResource, + currentResourceLimits.getHeadroom(), required); + } + + private CSAssignment createReservedIncreasedCSAssignment( + SchedContainerChangeRequest request) { + CSAssignment assignment = + new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, + application, false, false); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + request.getDeltaCapacity()); + assignment.getAssignmentInformation().incrReservations(); + assignment.getAssignmentInformation().addReservationDetails( + request.getContainerId(), application.getCSLeafQueue().getQueuePath()); + assignment.setIncreasedAllocation(true); + + LOG.info("Reserved increase container request:" + request.toString()); + + return assignment; + } + + private CSAssignment createSuccessfullyIncreasedCSAssignment( + SchedContainerChangeRequest request, boolean fromReservation) { + CSAssignment assignment = + new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, + application, false, fromReservation); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + request.getDeltaCapacity()); + assignment.getAssignmentInformation().incrAllocations(); + assignment.getAssignmentInformation().addAllocationDetails( + request.getContainerId(), application.getCSLeafQueue().getQueuePath()); + assignment.setIncreasedAllocation(true); + + // notify application + application + .getCSLeafQueue() + .getOrderingPolicy() + .containerAllocated(application, + application.getRMContainer(request.getContainerId())); + + LOG.info("Approved increase container request:" + request.toString() + + " fromReservation=" + fromReservation); + + return assignment; + } + + private CSAssignment allocateIncreaseRequestFromReservedContainer( + SchedulerNode node, Resource cluster, + SchedContainerChangeRequest increaseRequest) { + if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), + node.getAvailableResource())) { + // OK, we can allocate this increase request + // Unreserve it first + application.unreserve(increaseRequest.getPriority(), + (FiCaSchedulerNode) node, increaseRequest.getRMContainer()); + + // Notify application + application.increaseContainer(increaseRequest); + + // Notify node + node.increaseContainer(increaseRequest.getContainerId(), + increaseRequest.getDeltaCapacity()); + + return createSuccessfullyIncreasedCSAssignment(increaseRequest, true); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to allocate reserved increase request:" + + increaseRequest.toString() + + ". There's no enough available resource"); + } + + // We still cannot allocate this container, will wait for next turn + return CSAssignment.SKIP_ASSIGNMENT; + } + } + + private CSAssignment allocateIncreaseRequest(FiCaSchedulerNode node, + Resource cluster, SchedContainerChangeRequest increaseRequest) { + if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), + node.getAvailableResource())) { + // Notify node + node.increaseContainer(increaseRequest.getContainerId(), + increaseRequest.getDeltaCapacity()); + + // OK, we can allocate this increase request + // Notify application + application.increaseContainer(increaseRequest); + return createSuccessfullyIncreasedCSAssignment(increaseRequest, false); + } else { + boolean reservationSucceeded = + application.reserveIncreasedContainer(increaseRequest.getPriority(), + node, increaseRequest.getRMContainer(), + increaseRequest.getDeltaCapacity()); + + if (reservationSucceeded) { + // We cannot allocate this container, but since queue capacity / + // user-limit matches, we can reserve this container on this node. + return createReservedIncreasedCSAssignment(increaseRequest); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Reserve increase request=" + increaseRequest.toString() + + " failed. Skipping.."); + } + return CSAssignment.SKIP_ASSIGNMENT; + } + } + } + + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, RMContainer reservedContainer) { + AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); + NodeId nodeId = node.getNodeID(); + + if (reservedContainer == null) { + // Do we have increase request on this node? + if (!sinfo.hasIncreaseRequest(nodeId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip allocating increase request since we don't have any" + + " increase request on this node=" + node.getNodeID()); + } + + return CSAssignment.SKIP_ASSIGNMENT; + } + + // Check if we need to unreserve something, note that we don't support + // continuousReservationLooking now. TODO, need think more about how to + // support it. + boolean shouldUnreserve = + Resources.greaterThan(rc, clusterResource, + resourceLimits.getAmountNeededUnreserve(), Resources.none()); + + // Check if we can allocate minimum resource according to headroom + boolean cannotAllocateAnything = + !checkHeadroom(clusterResource, resourceLimits, rmContext + .getScheduler().getMinimumResourceCapability()); + + // Skip the app if we failed either of above check + if (cannotAllocateAnything || shouldUnreserve) { + if (LOG.isDebugEnabled()) { + if (shouldUnreserve) { + LOG.debug("Cannot continue since we have to unreserve some resource" + + ", now increase container allocation doesn't " + + "support continuous reservation looking.."); + } + if (cannotAllocateAnything) { + LOG.debug("We cannot allocate anything because of low headroom, " + + "headroom=" + resourceLimits.getHeadroom()); + } + } + + return CSAssignment.SKIP_ASSIGNMENT; + } + + CSAssignment assigned = null; + + /* + * Loop each priority, and containerId. Container priority is not + * equivalent to request priority, application master can run an important + * task on a less prioritized container. + * + * So behavior here is, we still try to increase container with higher + * priority, but will skip increase request and move to next increase + * request if queue-limit or user-limit aren't satisfied + */ + for (Priority priority : application.getPriorities()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at increase request for application=" + + application.getApplicationAttemptId() + " priority=" + + priority); + } + + /* + * If we have multiple to-be-increased containers under same priority on + * a same host, we will try to increase earlier launched container + * first. And again - we will skip a request and move to next if it + * cannot be allocated. + */ + Map<ContainerId, SchedContainerChangeRequest> increaseRequestMap = + sinfo.getIncreaseRequests(nodeId, priority); + + // We don't have more increase request on this priority, skip.. + if (null == increaseRequestMap) { + if (LOG.isDebugEnabled()) { + LOG.debug("There's no increase request for " + + application.getApplicationAttemptId() + " priority=" + + priority); + } + continue; + } + Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter = + increaseRequestMap.entrySet().iterator(); + List<SchedContainerChangeRequest> toBeRemovedRequests = + new ArrayList<>(); + + while (iter.hasNext()) { + Entry<ContainerId, SchedContainerChangeRequest> entry = + iter.next(); + SchedContainerChangeRequest increaseRequest = + entry.getValue(); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Looking at increase request=" + increaseRequest.toString()); + } + + boolean headroomSatisifed = checkHeadroom(clusterResource, + resourceLimits, increaseRequest.getDeltaCapacity()); + if (!headroomSatisifed) { + // skip if doesn't satisfy headroom limit + if (LOG.isDebugEnabled()) { + LOG.debug(" Headroom is not satisfied, skip.."); + } + continue; + } + + RMContainer rmContainer = increaseRequest.getRMContainer(); + if (rmContainer.getContainerState() != ContainerState.RUNNING) { + // if the container is not running, we should remove the + // increaseRequest and continue; + if (LOG.isDebugEnabled()) { + LOG.debug(" Container is not running any more, skip..."); + } + toBeRemovedRequests.add(increaseRequest); + continue; + } + + if (!Resources.fitsIn(rc, clusterResource, + increaseRequest.getTargetCapacity(), node.getTotalResource())) { + // if the target capacity is more than what the node can offer, we + // will simply remove and skip it. + // The reason of doing check here instead of adding increase request + // to scheduler because node's resource could be updated after + // request added. + if (LOG.isDebugEnabled()) { + LOG.debug(" Target capacity is more than what node can offer," + + " node.resource=" + node.getTotalResource()); + } + toBeRemovedRequests.add(increaseRequest); + continue; + } + + // Try to allocate the increase request + assigned = + allocateIncreaseRequest(node, clusterResource, increaseRequest); + if (!assigned.getSkipped()) { + // When we don't skip this request, which means we either allocated + // OR reserved this request. We will break + break; + } + } + + // Remove invalid in request requests + if (!toBeRemovedRequests.isEmpty()) { + for (SchedContainerChangeRequest req : toBeRemovedRequests) { + sinfo.removeIncreaseRequest(req.getNodeId(), req.getPriority(), + req.getContainerId()); + } + } + + // We already allocated something + if (!assigned.getSkipped()) { + break; + } + } + + return assigned == null ? CSAssignment.SKIP_ASSIGNMENT : assigned; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to allocate reserved increase container request.."); + } + + // We already reserved this increase container + SchedContainerChangeRequest request = + sinfo.getIncreaseRequest(nodeId, reservedContainer.getContainer() + .getPriority(), reservedContainer.getContainerId()); + + // We will cancel the reservation any of following happens + // - Container finished + // - No increase request needed + // - Target resource updated + if (null == request + || reservedContainer.getContainerState() != ContainerState.RUNNING + || (!Resources.equals(reservedContainer.getReservedResource(), + request.getDeltaCapacity()))) { + if (LOG.isDebugEnabled()) { + LOG.debug("We don't need reserved increase container request " + + "for container=" + reservedContainer.getContainerId() + + ". Unreserving and return..."); + } + + // We don't need this container now, just return excessive reservation + return new CSAssignment(application, reservedContainer); + } + + return allocateIncreaseRequestFromReservedContainer(node, clusterResource, + request); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index dcb99ed..fd99d29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; * Allocate normal (new) containers, considers locality/label, etc. Using * delayed scheduling mechanism to get better locality allocation. */ -public class RegularContainerAllocator extends ContainerAllocator { +public class RegularContainerAllocator extends AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); private ResourceRequest lastResourceRequest = null; @@ -56,6 +56,25 @@ public class RegularContainerAllocator extends ContainerAllocator { super(application, rc, rmContext); } + private boolean checkHeadroom(Resource clusterResource, + ResourceLimits currentResourceLimits, Resource required, + FiCaSchedulerNode node) { + // If headroom + currentReservation < required, we cannot allocate this + // require + Resource resourceCouldBeUnReserved = application.getCurrentReservation(); + if (!application.getCSLeafQueue().getReservationContinueLooking() + || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + // If we don't allow reservation continuous looking, OR we're looking at + // non-default node partition, we won't allow to unreserve before + // allocation. + resourceCouldBeUnReserved = Resources.none(); + } + return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add( + currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), + required); + } + + private ContainerAllocation preCheckForNewContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority) { @@ -97,8 +116,9 @@ public class RegularContainerAllocator extends ContainerAllocator { // Is the node-label-expression of this offswitch resource request // matches the node's label? // If not match, jump to next priority. - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest, - node.getPartition(), schedulingMode)) { + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( + anyRequest.getNodeLabelExpression(), node.getPartition(), + schedulingMode)) { return ContainerAllocation.PRIORITY_SKIPPED; } @@ -388,8 +408,8 @@ public class RegularContainerAllocator extends ContainerAllocator { } // check if the resource request can access the label - if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request, - node.getPartition(), schedulingMode)) { + if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( + request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) { // this is a reserved container, but we cannot allocate it now according // to label not match. This can be caused by node label changed // We should un-reserve this container. http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 300cba9..e97da24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,7 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -83,7 +84,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private ResourceScheduler scheduler; - private ContainerAllocator containerAllocator; + private AbstractContainerAllocator containerAllocator; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -118,7 +119,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { rc = scheduler.getResourceCalculator(); } - containerAllocator = new RegularContainerAllocator(this, rc, rmContext); + containerAllocator = new ContainerAllocator(this, rc, rmContext); } synchronized public boolean containerCompleted(RMContainer rmContainer, @@ -207,22 +208,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return rmContainer; } - public boolean unreserve(Priority priority, + public synchronized boolean unreserve(Priority priority, FiCaSchedulerNode node, RMContainer rmContainer) { + // Cancel increase request (if it has reserved increase request + rmContainer.cancelIncreaseReservation(); + // Done with the reservation? - if (unreserve(node, priority)) { + if (internalUnreserve(node, priority)) { node.unreserveResource(this); // Update reserved metrics queue.getMetrics().unreserveResource(getUser(), - rmContainer.getContainer().getResource()); + rmContainer.getReservedResource()); return true; } return false; } - @VisibleForTesting - public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { + private boolean internalUnreserve(FiCaSchedulerNode node, Priority priority) { Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(priority); @@ -241,7 +244,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Reset the re-reservation count resetReReservations(priority); - Resource resource = reservedContainer.getContainer().getResource(); + Resource resource = reservedContainer.getReservedResource(); this.attemptResourceUsage.decReserved(node.getPartition(), resource); LOG.info("Application " + getApplicationId() + " unreserved " @@ -311,13 +314,15 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - ContainersAndNMTokensAllocation allocation = - pullNewlyAllocatedContainersAndNMTokens(); + List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers(); + List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers(); + List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers(); + List<NMToken> updatedNMTokens = pullUpdatedNMTokens(); Resource headroom = getHeadroom(); setApplicationHeadroomForMetrics(headroom); - return new Allocation(allocation.getContainerList(), headroom, null, - currentContPreemption, Collections.singletonList(rr), - allocation.getNMTokenList()); + return new Allocation(newlyAllocatedContainers, headroom, null, + currentContPreemption, Collections.singletonList(rr), updatedNMTokens, + newlyIncreasedContainers, newlyDecreasedContainers); } synchronized public NodeId getNodeIdToUnreserve(Priority priority, @@ -332,15 +337,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) { NodeId nodeId = entry.getKey(); - Resource containerResource = entry.getValue().getContainer().getResource(); + RMContainer reservedContainer = entry.getValue(); + if (reservedContainer.hasIncreaseReservation()) { + // Currently, only regular container allocation supports continuous + // reservation looking, we don't support canceling increase request + // reservation when allocating regular container. + continue; + } + + Resource reservedResource = reservedContainer.getReservedResource(); // make sure we unreserve one with at least the same amount of // resources, otherwise could affect capacity limits - if (Resources.lessThanOrEqual(rc, clusterResource, - resourceNeedUnreserve, containerResource)) { + if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, + reservedResource)) { if (LOG.isDebugEnabled()) { LOG.debug("unreserving node with reservation size: " - + containerResource + + reservedResource + " in order to allocate container with size: " + resourceNeedUnreserve); } return nodeId; @@ -374,6 +387,25 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { this.headroomProvider = ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); } + + public boolean reserveIncreasedContainer(Priority priority, + FiCaSchedulerNode node, + RMContainer rmContainer, Resource reservedResource) { + // Inform the application + if (super.reserveIncreasedContainer(node, priority, rmContainer, + reservedResource)) { + + queue.getMetrics().reserveResource(getUser(), reservedResource); + + // Update the node + node.reserveResource(this, priority, rmContainer); + + // Succeeded + return true; + } + + return false; + } public void reserve(Priority priority, FiCaSchedulerNode node, RMContainer rmContainer, Container container) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3a39799..69654e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -19,7 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -32,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -68,7 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -904,7 +913,9 @@ public class FairScheduler extends @Override public Allocation allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask, List<ContainerId> release, - List<String> blacklistAdditions, List<String> blacklistRemovals) { + List<String> blacklistAdditions, List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); @@ -963,18 +974,17 @@ public class FairScheduler extends application.updateBlacklist(blacklistAdditions, blacklistRemovals); } - ContainersAndNMTokensAllocation allocation = - application.pullNewlyAllocatedContainersAndNMTokens(); - + List<Container> newlyAllocatedContainers = + application.pullNewlyAllocatedContainers(); // Record container allocation time - if (!(allocation.getContainerList().isEmpty())) { + if (!(newlyAllocatedContainers.isEmpty())) { application.recordContainerAllocationTime(getClock().getTime()); } Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); - return new Allocation(allocation.getContainerList(), headroom, - preemptionContainerIds, null, null, allocation.getNMTokenList()); + return new Allocation(newlyAllocatedContainers, headroom, + preemptionContainerIds, null, null, application.pullUpdatedNMTokens()); } } @@ -1706,4 +1716,11 @@ public class FairScheduler extends } return targetQueueName; } + + @Override + protected void decreaseContainer( + SchedContainerChangeRequest decreaseRequest, + SchedulerApplicationAttempt attempt) { + // TODO Auto-generated method stub + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 99760df..2ec2311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -76,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -310,9 +311,11 @@ public class FifoScheduler extends } @Override - public Allocation allocate( - ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, - List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { + public Allocation allocate(ApplicationAttemptId applicationAttemptId, + List<ResourceRequest> ask, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + @@ -364,12 +367,10 @@ public class FifoScheduler extends application.updateBlacklist(blacklistAdditions, blacklistRemovals); } - ContainersAndNMTokensAllocation allocation = - application.pullNewlyAllocatedContainersAndNMTokens(); Resource headroom = application.getHeadroom(); application.setApplicationHeadroomForMetrics(headroom); - return new Allocation(allocation.getContainerList(), headroom, null, - null, null, allocation.getNMTokenList()); + return new Allocation(application.pullNewlyAllocatedContainers(), + headroom, null, null, null, application.pullUpdatedNMTokens()); } } @@ -1005,4 +1006,12 @@ public class FifoScheduler extends public Resource getUsedResource() { return usedResource; } + + @Override + protected void decreaseContainer( + SchedContainerChangeRequest decreaseRequest, + SchedulerApplicationAttempt attempt) { + // TODO Auto-generated method stub + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e62f7d7..b536546 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -323,7 +323,7 @@ public class Application { // Get resources from the ResourceManager Allocation allocation = resourceManager.getResourceScheduler().allocate( applicationAttemptId, new ArrayList<ResourceRequest>(ask), - new ArrayList<ContainerId>(), null, null); + new ArrayList<ContainerId>(), null, null, null, null); System.out.println("-=======" + applicationAttemptId); System.out.println("----------" + resourceManager.getRMContext().getRMApps() .get(applicationId).getRMAppAttempt(applicationAttemptId)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 5660b78..c325a65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -235,6 +236,14 @@ public class MockAM { releases, null); return allocate(req); } + + public AllocateResponse sendContainerResizingRequest( + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) throws Exception { + final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, + null, increaseRequests, decreaseRequests); + return allocate(req); + } public AllocateResponse allocate(AllocateRequest allocateRequest) throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 53cb8d0..92f3edf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -231,6 +233,17 @@ public class MockNodes { } return CommonNodeLabelsManager.EMPTY_STRING_SET; } + + @Override + public void updateNodeHeartbeatResponseForContainersDecreasing( + NodeHeartbeatResponse response) { + + } + + @Override + public List<Container> pullNewlyIncreasedContainers() { + return Collections.emptyList(); + } }; private static RMNode buildRMNode(int rack, final Resource perNode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index e464401..7263b74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -189,6 +189,19 @@ public class MockRM extends ResourceManager { } } + public void waitForContainerState(ContainerId containerId, + RMContainerState state) throws Exception { + int timeoutSecs = 0; + RMContainer container = getResourceScheduler().getRMContainer(containerId); + while ((container == null || container.getState() != state) + && timeoutSecs++ < 40) { + System.out.println( + "Waiting for" + containerId + " state to be:" + state.name()); + Thread.sleep(200); + } + Assert.assertTrue(container.getState() == state); + } + public void waitForContainerAllocated(MockNM nm, ContainerId containerId) throws Exception { int timeoutSecs = 0;
