http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a69af6e..fd0c68b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -47,8 +47,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -71,12 +76,10 @@ public class ParentQueue extends AbstractCSQueue { protected final Set<CSQueue> childQueues; private final boolean rootQueue; - final Comparator<CSQueue> nonPartitionedQueueComparator; - final PartitionedQueueComparator partitionQueueComparator; - volatile int numApplications; + private final Comparator<CSQueue> nonPartitionedQueueComparator; + private final PartitionedQueueComparator partitionQueueComparator; + private volatile int numApplications; private final CapacitySchedulerContext scheduler; - private boolean needToResortQueuesAtNextAllocation = false; - private int offswitchPerHeartbeatLimit; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -86,7 +89,7 @@ public class ParentQueue extends AbstractCSQueue { super(cs, queueName, parent, old); this.scheduler = cs; this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); - this.partitionQueueComparator = cs.getPartitionedQueueComparator(); + this.partitionQueueComparator = new PartitionedQueueComparator(); this.rootQueue = (parent == null); @@ -126,16 +129,12 @@ public class ParentQueue extends AbstractCSQueue { } } - offswitchPerHeartbeatLimit = - csContext.getConfiguration().getOffSwitchPerHeartbeatLimit(); - LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" - + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { writeLock.unlock(); @@ -215,11 +214,6 @@ public class ParentQueue extends AbstractCSQueue { } - @Private - public int getOffSwitchPerHeartbeatLimit() { - return offswitchPerHeartbeatLimit; - } - private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { try { @@ -435,156 +429,145 @@ public class ParentQueue extends AbstractCSQueue { @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { - int offswitchCount = 0; - try { - writeLock.lock(); - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node + PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits, + SchedulingMode schedulingMode) { + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(ps.getPartition())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it is not able to access partition=" + ps + .getPartition()); + } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node .getPartition()); - } + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node - .getPartition()); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } + return CSAssignment.NULL_ASSIGNMENT; + } - return CSAssignment.NULL_ASSIGNMENT; + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!super.hasPendingResourceRequest(ps.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + ps + .getPartition()); } - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node - .getPartition()); - } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + + return CSAssignment.NULL_ASSIGNMENT; + } + + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), + NodeType.NODE_LOCAL); + + while (canAssign(clusterResource, node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to assign containers to child-queue of " + + getQueueName()); + } + + // Are we over maximum-capacity for this queue? + // This will also consider parent's limits and also continuous reservation + // looking + if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), + resourceLimits, Resources + .createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores()), schedulingMode)) { ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); if (rootQueue) { ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, node); } - return CSAssignment.NULL_ASSIGNMENT; + break; } - CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), - NodeType.NODE_LOCAL); + // Schedule + CSAssignment assignedToChild = assignContainersToChildQueues( + clusterResource, ps, resourceLimits, schedulingMode); + assignment.setType(assignedToChild.getType()); + assignment.setRequestLocalityType( + assignedToChild.getRequestLocalityType()); + assignment.setExcessReservation(assignedToChild.getExcessReservation()); + assignment.setContainersToKill(assignedToChild.getContainersToKill()); - while (canAssign(clusterResource, node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to assign containers to child-queue of " - + getQueueName()); - } + // Done if no child-queue assigned anything + if (Resources.greaterThan(resourceCalculator, clusterResource, + assignedToChild.getResource(), Resources.none())) { - // Are we over maximum-capacity for this queue? - // This will also consider parent's limits and also continuous reservation - // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - resourceLimits, Resources - .createResource(getMetrics().getReservedMB(), - getMetrics().getReservedVirtualCores()), schedulingMode)) { - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + boolean isReserved = + assignedToChild.getAssignmentInformation().getReservationDetails() + != null && !assignedToChild.getAssignmentInformation() + .getReservationDetails().isEmpty(); + if (node != null && !isReserved) { if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - break; - } - - // Schedule - CSAssignment assignedToChild = assignContainersToChildQueues( - clusterResource, node, resourceLimits, schedulingMode); - assignment.setType(assignedToChild.getType()); - - // Done if no child-queue assigned anything - if (Resources.greaterThan(resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); - - if (node.getReservedContainer() == null) { - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.ALLOCATED); - } - } else{ - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.RESERVED); - } + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); } - - // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); - - // Track resource utilization in this pass of the scheduler - Resources.addTo(assignment.getResource(), - assignedToChild.getResource()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - assignedToChild.getAssignmentInformation().getAllocated()); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - assignedToChild.getAssignmentInformation().getReserved()); - assignment.getAssignmentInformation().incrAllocations( - assignedToChild.getAssignmentInformation().getNumAllocations()); - assignment.getAssignmentInformation().incrReservations( - assignedToChild.getAssignmentInformation().getNumReservations()); - assignment.getAssignmentInformation().getAllocationDetails().addAll( - assignedToChild.getAssignmentInformation() - .getAllocationDetails()); - assignment.getAssignmentInformation().getReservationDetails().addAll( - assignedToChild.getAssignmentInformation() - .getReservationDetails()); - assignment.setIncreasedAllocation( - assignedToChild.isIncreasedAllocation()); - - LOG.info("assignedContainer" + " queue=" + getQueueName() - + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() - + " cluster=" + clusterResource); - } else{ - assignment.setSkippedType(assignedToChild.getSkippedType()); - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); } - - break; } + // Track resource utilization in this pass of the scheduler + Resources.addTo(assignment.getResource(), + assignedToChild.getResource()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + assignedToChild.getAssignmentInformation().getAllocated()); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + assignedToChild.getAssignmentInformation().getReserved()); + assignment.getAssignmentInformation().incrAllocations( + assignedToChild.getAssignmentInformation().getNumAllocations()); + assignment.getAssignmentInformation().incrReservations( + assignedToChild.getAssignmentInformation().getNumReservations()); + assignment.getAssignmentInformation().getAllocationDetails().addAll( + assignedToChild.getAssignmentInformation() + .getAllocationDetails()); + assignment.getAssignmentInformation().getReservationDetails().addAll( + assignedToChild.getAssignmentInformation() + .getReservationDetails()); + assignment.setIncreasedAllocation( + assignedToChild.isIncreasedAllocation()); + + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + + " cluster=" + clusterResource); + if (LOG.isDebugEnabled()) { LOG.debug( "ParentQ=" + getQueueName() + " assignedSoFarInThisIteration=" @@ -592,39 +575,47 @@ public class ParentQueue extends AbstractCSQueue { + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity()); } + } else{ + assignment.setSkippedType(assignedToChild.getSkippedType()); - if (assignment.getType() == NodeType.OFF_SWITCH) { - offswitchCount++; + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); } - // Do not assign more containers if this isn't the root queue - // or if we've already assigned enough OFF_SWITCH containers in - // this pass - if (!rootQueue || offswitchCount >= getOffSwitchPerHeartbeatLimit()) { - if (LOG.isDebugEnabled()) { - if (rootQueue && offswitchCount >= getOffSwitchPerHeartbeatLimit()) { - LOG.debug("Assigned maximum number of off-switch containers: " + - offswitchCount + ", assignments so far: " + assignment); - } - } - break; - } + break; } - return assignment; - } finally { - writeLock.unlock(); + /* + * Previously here, we can allocate more than one container for each + * allocation under rootQ. Now this logic is not proper any more + * in global scheduling world. + * + * So here do not try to allocate more than one container for each + * allocation, let top scheduler make the decision. + */ + break; } + + return assignment; } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { + // When node == null means global scheduling is enabled, always return true + if (null == node) { + return true; + } + // Two conditions need to meet when trying to allocate: // 1) Node doesn't have reserved container // 2) Node's available-resource + killable-resource should > 0 return node.getReservedContainer() == null && Resources.greaterThanOrEqual( resourceCalculator, clusterResource, Resources - .add(node.getUnallocatedResource(), node.getTotalKillableResources()), - minimumAllocation); + .add(node.getUnallocatedResource(), + node.getTotalKillableResources()), minimumAllocation); } private ResourceLimits getResourceLimitsOfChild(CSQueue child, @@ -662,28 +653,20 @@ public class ParentQueue extends AbstractCSQueue { return new ResourceLimits(childLimit); } - - private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) { - if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { - if (needToResortQueuesAtNextAllocation) { - // If we skipped resort queues last time, we need to re-sort queue - // before allocation - List<CSQueue> childrenList = new ArrayList<>(childQueues); - childQueues.clear(); - childQueues.addAll(childrenList); - needToResortQueuesAtNextAllocation = false; - } - return childQueues.iterator(); - } - partitionQueueComparator.setPartitionToLookAt(node.getPartition()); + private Iterator<CSQueue> sortAndGetChildrenAllocationIterator( + String partition) { + // Previously we keep a sorted list for default partition, it is not good + // when multi-threading scheduler is enabled, so to make a simpler code + // now re-sort queue every time irrespective to node partition. + partitionQueueComparator.setPartitionToLookAt(partition); List<CSQueue> childrenList = new ArrayList<>(childQueues); Collections.sort(childrenList, partitionQueueComparator); return childrenList.iterator(); } - - private CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, + + private CSAssignment assignContainersToChildQueues(Resource cluster, + PlacementSet<FiCaSchedulerNode> ps, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -691,8 +674,8 @@ public class ParentQueue extends AbstractCSQueue { printChildQueues(); // Try to assign to most 'under-served' sub-queue - for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter - .hasNext();) { + for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator( + ps.getPartition()); iter.hasNext(); ) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -702,9 +685,9 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, parentLimits, - node.getPartition()); + ps.getPartition()); - CSAssignment childAssignment = childQueue.assignContainers(cluster, node, + CSAssignment childAssignment = childQueue.assignContainers(cluster, ps, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + @@ -712,22 +695,9 @@ public class ParentQueue extends AbstractCSQueue { childAssignment.getResource() + ", " + childAssignment.getType()); } - // If we do assign, remove the queue and re-insert in-order to re-sort if (Resources.greaterThan( resourceCalculator, cluster, childAssignment.getResource(), Resources.none())) { - // Only update childQueues when we doing non-partitioned node - // allocation. - if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { - // Remove and re-insert to sort - iter.remove(); - LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() - + " stats: " + childQueue); - childQueues.add(childQueue); - if (LOG.isDebugEnabled()) { - printChildQueues(); - } - } assignment = childAssignment; break; } else if (childAssignment.getSkippedType() == @@ -770,10 +740,10 @@ public class ParentQueue extends AbstractCSQueue { + " child-queues: " + getChildQueuesToPrint()); } } - + private void internalReleaseResource(Resource clusterResource, - FiCaSchedulerNode node, Resource releasedResource, boolean changeResource, - CSQueue completedChildQueue, boolean sortQueues) { + FiCaSchedulerNode node, Resource releasedResource, + boolean changeResource) { try { writeLock.lock(); super.releaseResource(clusterResource, releasedResource, @@ -784,29 +754,6 @@ public class ParentQueue extends AbstractCSQueue { "completedContainer " + this + ", cluster=" + clusterResource); } - // Note that this is using an iterator on the childQueues so this can't - // be called if already within an iterator for the childQueues. Like - // from assignContainersToChildQueues. - if (sortQueues) { - // reinsert the updated queue - for (Iterator<CSQueue> iter = childQueues.iterator(); - iter.hasNext(); ) { - CSQueue csqueue = iter.next(); - if (csqueue.equals(completedChildQueue)) { - iter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-sorting completed queue: " + csqueue); - } - childQueues.add(csqueue); - break; - } - } - } - - // If we skipped sort queue this time, we need to resort queues to make - // sure we allocate from least usage (or order defined by queue policy) - // queues. - needToResortQueuesAtNextAllocation = !sortQueues; } finally { writeLock.unlock(); } @@ -821,8 +768,7 @@ public class ParentQueue extends AbstractCSQueue { Resources.negate(decreaseRequest.getDeltaCapacity()); internalReleaseResource(clusterResource, - csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false, - null, false); + csContext.getNode(decreaseRequest.getNodeId()), absDeltaCapacity, false); // Inform the parent if (parent != null) { @@ -835,7 +781,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { if (app != null) { internalReleaseResource(clusterResource, node, - rmContainer.getReservedResource(), false, null, false); + rmContainer.getReservedResource(), false); // Inform the parent if (parent != null) { @@ -853,8 +799,7 @@ public class ParentQueue extends AbstractCSQueue { boolean sortQueues) { if (application != null) { internalReleaseResource(clusterResource, node, - rmContainer.getContainer().getResource(), false, completedChildQueue, - sortQueues); + rmContainer.getContainer().getResource(), false); // Inform the parent if (parent != null) { @@ -1062,4 +1007,37 @@ public class ParentQueue extends AbstractCSQueue { } } } + + public void apply(Resource cluster, + ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { + if (request.anythingAllocatedOrReserved()) { + ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> + allocation = request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> + schedulerContainer = allocation.getAllocatedOrReservedContainer(); + + // Do not modify queue when allocation from reserved container + if (allocation.getAllocateFromReservedContainer() == null) { + try { + writeLock.lock(); + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(cluster, allocation.getAllocatedOrReservedResource(), + schedulerContainer.getNodePartition(), + allocation.isIncreasedAllocation()); + + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + + " cluster=" + cluster); + } finally { + writeLock.unlock(); + } + } + } + + if (parent != null) { + parent.apply(cluster, request); + } + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index fa13df4..5bb91e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -77,11 +78,13 @@ public abstract class AbstractContainerAllocator { // Handle excess reservation assignment.setExcessReservation(result.getContainerToBeUnreserved()); + assignment.setRequestLocalityType(result.requestLocalityType); + // If we allocated something if (Resources.greaterThan(rc, clusterResource, result.getResourceToBeAllocated(), Resources.none())) { Resource allocatedResource = result.getResourceToBeAllocated(); - Container updatedContainer = result.getUpdatedContainer(); + RMContainer updatedContainer = result.getUpdatedContainer(); assignment.setResource(allocatedResource); assignment.setType(result.getContainerNodeType()); @@ -92,8 +95,7 @@ public abstract class AbstractContainerAllocator { + application.getApplicationId() + " resource=" + allocatedResource + " queue=" + this.toString() + " cluster=" + clusterResource); assignment.getAssignmentInformation().addReservationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); + updatedContainer, application.getCSLeafQueue().getQueuePath()); assignment.getAssignmentInformation().incrReservations(); Resources.addTo(assignment.getAssignmentInformation().getReserved(), allocatedResource); @@ -111,41 +113,37 @@ public abstract class AbstractContainerAllocator { ActivityState.RESERVED); ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( activitiesManager, application.getApplicationId(), - updatedContainer.getId(), ActivityState.RESERVED, + updatedContainer.getContainerId(), ActivityState.RESERVED, ActivityDiagnosticConstant.EMPTY); } } else if (result.getAllocationState() == AllocationState.ALLOCATED){ // This is a new container // Inform the ordering policy - LOG.info("assignedContainer" + " application attempt=" - + application.getApplicationAttemptId() + " container=" - + updatedContainer.getId() + " queue=" + this + " clusterResource=" + LOG.info("assignedContainer" + " application attempt=" + application + .getApplicationAttemptId() + " container=" + updatedContainer + .getContainerId() + " queue=" + this + " clusterResource=" + clusterResource + " type=" + assignment.getType()); - application - .getCSLeafQueue() - .getOrderingPolicy() - .containerAllocated(application, - application.getRMContainer(updatedContainer.getId())); - assignment.getAssignmentInformation().addAllocationDetails( - updatedContainer.getId(), - application.getCSLeafQueue().getQueuePath()); + updatedContainer, application.getCSLeafQueue().getQueuePath()); assignment.getAssignmentInformation().incrAllocations(); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), allocatedResource); if (rmContainer != null) { assignment.setFulfilledReservation(true); + assignment.setFulfilledReservedContainer(rmContainer); } ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager, node, application, updatedContainer, ActivityState.ALLOCATED); ActivitiesLogger.APP.finishAllocatedAppAllocationRecording( activitiesManager, application.getApplicationId(), - updatedContainer.getId(), ActivityState.ACCEPTED, + updatedContainer.getContainerId(), ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + // Update unformed resource + application.incUnconfirmedRes(allocatedResource); } assignment.setContainersToKill(result.getToKillContainers()); @@ -170,8 +168,15 @@ public abstract class AbstractContainerAllocator { * <li>Do allocation: this will decide/create allocated/reserved * container, this will also update metrics</li> * </ul> + * + * @param clusterResource clusterResource + * @param ps PlacementSet + * @param schedulingMode scheduling mode (exclusive or nonexclusive) + * @param resourceLimits resourceLimits + * @param reservedContainer reservedContainer + * @return CSAssignemnt proposal */ public abstract CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 8f749f6..f408508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; import java.util.List; @@ -57,8 +58,13 @@ public class ContainerAllocation { private Resource resourceToBeAllocated = Resources.none(); AllocationState state; NodeType containerNodeType = NodeType.NODE_LOCAL; - NodeType requestNodeType = NodeType.NODE_LOCAL; - Container updatedContainer; + NodeType requestLocalityType = null; + + /** + * When some (new) container allocated/reserved or some increase container + * request allocated/reserved, updatedContainer will be set. + */ + RMContainer updatedContainer; private List<RMContainer> toKillContainers; public ContainerAllocation(RMContainer containerToBeUnreserved, @@ -87,7 +93,7 @@ public class ContainerAllocation { return containerNodeType; } - public Container getUpdatedContainer() { + public RMContainer getUpdatedContainer() { return updatedContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index 4eaa24b..57188d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocator extends AbstractContainerAllocator { - AbstractContainerAllocator increaseContainerAllocator; - AbstractContainerAllocator regularContainerAllocator; + private AbstractContainerAllocator increaseContainerAllocator; + private AbstractContainerAllocator regularContainerAllocator; public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext) { @@ -52,17 +53,17 @@ public class ContainerAllocator extends AbstractContainerAllocator { @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { if (reservedContainer != null) { if (reservedContainer.getState() == RMContainerState.RESERVED) { // It's a regular container return regularContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + ps, schedulingMode, resourceLimits, reservedContainer); } else { // It's a increase container return increaseContainerAllocator.assignContainers(clusterResource, - node, schedulingMode, resourceLimits, reservedContainer); + ps, schedulingMode, resourceLimits, reservedContainer); } } else { /* @@ -70,14 +71,14 @@ public class ContainerAllocator extends AbstractContainerAllocator { * anything, we will try to allocate regular container */ CSAssignment assign = - increaseContainerAllocator.assignContainers(clusterResource, node, + increaseContainerAllocator.assignContainers(clusterResource, ps, schedulingMode, resourceLimits, null); if (Resources.greaterThan(rc, clusterResource, assign.getResource(), Resources.none())) { return assign; } - return regularContainerAllocator.assignContainers(clusterResource, node, + return regularContainerAllocator.assignContainers(clusterResource, ps, schedulingMode, resourceLimits, null); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 509dfba..74a64c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -18,12 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -36,16 +30,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + public class IncreaseContainerAllocator extends AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(IncreaseContainerAllocator.class); @@ -76,7 +75,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { request.getDeltaCapacity()); assignment.getAssignmentInformation().incrReservations(); assignment.getAssignmentInformation().addReservationDetails( - request.getContainerId(), application.getCSLeafQueue().getQueuePath()); + request.getRMContainer(), application.getCSLeafQueue().getQueuePath()); assignment.setIncreasedAllocation(true); LOG.info("Reserved increase container request:" + request.toString()); @@ -93,8 +92,12 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { request.getDeltaCapacity()); assignment.getAssignmentInformation().incrAllocations(); assignment.getAssignmentInformation().addAllocationDetails( - request.getContainerId(), application.getCSLeafQueue().getQueuePath()); + request.getRMContainer(), application.getCSLeafQueue().getQueuePath()); assignment.setIncreasedAllocation(true); + + if (fromReservation) { + assignment.setFulfilledReservedContainer(request.getRMContainer()); + } // notify application application @@ -114,19 +117,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { SchedContainerChangeRequest increaseRequest) { if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), node.getUnallocatedResource())) { - // OK, we can allocate this increase request - // Unreserve it first - application.unreserve( - increaseRequest.getRMContainer().getAllocatedSchedulerKey(), - (FiCaSchedulerNode) node, increaseRequest.getRMContainer()); - - // Notify application - application.increaseContainer(increaseRequest); - - // Notify node - node.increaseContainer(increaseRequest.getContainerId(), - increaseRequest.getDeltaCapacity()); - return createSuccessfullyIncreasedCSAssignment(increaseRequest, true); } else { if (LOG.isDebugEnabled()) { @@ -144,40 +134,26 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { Resource cluster, SchedContainerChangeRequest increaseRequest) { if (Resources.fitsIn(rc, cluster, increaseRequest.getDeltaCapacity(), node.getUnallocatedResource())) { - // Notify node - node.increaseContainer(increaseRequest.getContainerId(), - increaseRequest.getDeltaCapacity()); - - // OK, we can allocate this increase request - // Notify application - application.increaseContainer(increaseRequest); return createSuccessfullyIncreasedCSAssignment(increaseRequest, false); - } else { - boolean reservationSucceeded = - application.reserveIncreasedContainer( - increaseRequest.getRMContainer().getAllocatedSchedulerKey(), - node, increaseRequest.getRMContainer(), - increaseRequest.getDeltaCapacity()); - - if (reservationSucceeded) { - // We cannot allocate this container, but since queue capacity / - // user-limit matches, we can reserve this container on this node. - return createReservedIncreasedCSAssignment(increaseRequest); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Reserve increase request=" + increaseRequest.toString() - + " failed. Skipping.."); - } - return CSAssignment.SKIP_ASSIGNMENT; - } + } else{ + // We cannot allocate this container, but since queue capacity / + // user-limit matches, we can reserve this container on this node. + return createReservedIncreasedCSAssignment(increaseRequest); } } @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { AppSchedulingInfo sinfo = application.getAppSchedulingInfo(); + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + + if (null == node) { + // This is global scheduling enabled + // FIXME, support container increase when global scheduling enabled + return CSAssignment.SKIP_ASSIGNMENT; + } NodeId nodeId = node.getNodeID(); if (reservedContainer == null) { @@ -258,8 +234,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { } Iterator<Entry<ContainerId, SchedContainerChangeRequest>> iter = increaseRequestMap.entrySet().iterator(); - List<SchedContainerChangeRequest> toBeRemovedRequests = - new ArrayList<>(); while (iter.hasNext()) { Entry<ContainerId, SchedContainerChangeRequest> entry = @@ -289,7 +263,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { if (LOG.isDebugEnabled()) { LOG.debug(" Container is not running any more, skip..."); } - toBeRemovedRequests.add(increaseRequest); + application.addToBeRemovedIncreaseRequest(increaseRequest); continue; } @@ -304,7 +278,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { LOG.debug(" Target capacity is more than what node can offer," + " node.resource=" + node.getTotalResource()); } - toBeRemovedRequests.add(increaseRequest); + application.addToBeRemovedIncreaseRequest(increaseRequest); continue; } @@ -318,15 +292,6 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { break; } } - - // Remove invalid in request requests - if (!toBeRemovedRequests.isEmpty()) { - for (SchedContainerChangeRequest req : toBeRemovedRequests) { - sinfo.removeIncreaseRequest(req.getNodeId(), - req.getRMContainer().getAllocatedSchedulerKey(), - req.getContainerId()); - } - } // We may have allocated something if (assigned != null && assigned.getSkippedType() http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 1a3f71f..3e8282f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -19,13 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -50,6 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssign import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -71,12 +75,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private boolean checkHeadroom(Resource clusterResource, ResourceLimits currentResourceLimits, Resource required, - FiCaSchedulerNode node) { + String nodePartition) { // If headroom + currentReservation < required, we cannot allocate this // require Resource resourceCouldBeUnReserved = application.getCurrentReservation(); if (!application.getCSLeafQueue().getReservationContinueLooking() - || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + || !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { // If we don't allow reservation continuous looking, OR we're looking at // non-default node partition, we won't allow to unreserve before // allocation. @@ -87,20 +91,17 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { required); } - - private ContainerAllocation preCheckForNewContainer(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + /* + * Pre-check if we can allocate a pending resource request + * (given schedulerKey) to a given PlacementSet. + * We will consider stuffs like exclusivity, pending resource, node partition, + * headroom, etc. + */ + private ContainerAllocation preCheckForPlacementSet(Resource clusterResource, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) { Priority priority = schedulerKey.getPriority(); - - if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { - application.updateAppSkipNodeDiagnostics( - CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); - ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( - activitiesManager, node, application, priority, - ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE); - return ContainerAllocation.APP_SKIPPED; - } + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); ResourceRequest anyRequest = application.getResourceRequest(schedulerKey, ResourceRequest.ANY); @@ -144,7 +145,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // matches the node's label? // If not match, jump to next priority. if (!SchedulerUtils.checkResourceRequestMatchingNodePartition( - anyRequest.getNodeLabelExpression(), node.getPartition(), + anyRequest.getNodeLabelExpression(), ps.getPartition(), schedulingMode)) { ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, priority, @@ -165,7 +166,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } } - if (!checkHeadroom(clusterResource, resourceLimits, required, node)) { + if (!checkHeadroom(clusterResource, resourceLimits, required, + ps.getPartition())) { if (LOG.isDebugEnabled()) { LOG.debug("cannot allocate required resource=" + required + " because of headroom"); @@ -176,9 +178,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return ContainerAllocation.QUEUE_SKIPPED; } - // Inform the application it is about to get a scheduling opportunity - application.addSchedulingOpportunity(schedulerKey); - // Increase missed-non-partitioned-resource-request-opportunity. // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes @@ -210,32 +209,43 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return ContainerAllocation.APP_SKIPPED; } } - + return null; } - ContainerAllocation preAllocation(Resource clusterResource, + private ContainerAllocation checkIfNodeBlackListed(FiCaSchedulerNode node, + SchedulerRequestKey schedulerKey) { + Priority priority = schedulerKey.getPriority(); + + if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) { + application.updateAppSkipNodeDiagnostics( + CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE); + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE); + return ContainerAllocation.APP_SKIPPED; + } + + return null; + } + + ContainerAllocation tryAllocateOnNode(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { ContainerAllocation result; - if (null == reservedContainer) { - // pre-check when allocating new container - result = - preCheckForNewContainer(clusterResource, node, schedulingMode, - resourceLimits, schedulerKey); - if (null != result) { - return result; - } - } else { - // pre-check when allocating reserved container - if (application.getTotalRequiredResources(schedulerKey) == 0) { - // Release - return new ContainerAllocation(reservedContainer, null, - AllocationState.QUEUE_SKIPPED); - } + + // Sanity checks before assigning to this node + result = checkIfNodeBlackListed(node, schedulerKey); + if (null != result) { + return result; } + // Inform the application it is about to get a scheduling opportunity + // TODO, we may need to revisit here to see if we should add scheduling + // opportunity here + application.addSchedulingOpportunity(schedulerKey); + // Try to allocate containers on node result = assignContainersOnNode(clusterResource, node, schedulerKey, @@ -383,20 +393,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { Priority priority = schedulerKey.getPriority(); ContainerAllocation allocation; + NodeType requestLocalityType = null; - NodeType requestType = null; // Data-local ResourceRequest nodeLocalResourceRequest = application.getResourceRequest(schedulerKey, node.getNodeName()); if (nodeLocalResourceRequest != null) { - requestType = NodeType.NODE_LOCAL; + requestLocalityType = NodeType.NODE_LOCAL; allocation = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { - allocation.requestNodeType = requestType; + allocation.requestLocalityType = requestLocalityType; return allocation; } } @@ -412,9 +422,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return ContainerAllocation.PRIORITY_SKIPPED; } - if (requestType != NodeType.NODE_LOCAL) { - requestType = NodeType.RACK_LOCAL; - } + requestLocalityType = requestLocalityType == null ? + NodeType.RACK_LOCAL : + requestLocalityType; allocation = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, @@ -422,7 +432,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, allocation.getResourceToBeAllocated(), Resources.none())) { - allocation.requestNodeType = requestType; + allocation.requestLocalityType = requestLocalityType; return allocation; } } @@ -437,22 +447,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY); return ContainerAllocation.PRIORITY_SKIPPED; } - if (requestType != NodeType.NODE_LOCAL - && requestType != NodeType.RACK_LOCAL) { - requestType = NodeType.OFF_SWITCH; - } + + requestLocalityType = requestLocalityType == null ? + NodeType.OFF_SWITCH : + requestLocalityType; allocation = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, schedulerKey, reservedContainer, schedulingMode, currentResoureLimits); - allocation.requestNodeType = requestType; - + // When a returned allocation is LOCALITY_SKIPPED, since we're in // off-switch request now, we will skip this app w.r.t priorities if (allocation.state == AllocationState.LOCALITY_SKIPPED) { allocation.state = AllocationState.APP_SKIPPED; } + allocation.requestLocalityType = requestLocalityType; return allocation; } @@ -671,33 +681,27 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { private Container createContainer(FiCaSchedulerNode node, Resource capability, SchedulerRequestKey schedulerKey) { - NodeId nodeId = node.getRMNode().getNodeID(); - ContainerId containerId = - BuilderUtils.newContainerId(application.getApplicationAttemptId(), - application.getNewContainerId()); // Create the container - return BuilderUtils.newContainer(containerId, nodeId, + // Now set the containerId to null first, because it is possible the + // container will be rejected because of concurrent resource allocation. + // new containerId will be generated and assigned to the container + // after confirmed. + return BuilderUtils.newContainer(null, nodeId, node.getRMNode().getHttpAddress(), capability, schedulerKey.getPriority(), null, schedulerKey.getAllocationRequestId()); } - + private ContainerAllocation handleNewContainerAllocation( ContainerAllocation allocationResult, FiCaSchedulerNode node, - SchedulerRequestKey schedulerKey, RMContainer reservedContainer, - Container container) { - // Handling container allocation - // Did we previously reserve containers at this 'priority'? - if (reservedContainer != null) { - application.unreserve(schedulerKey, node, reservedContainer); - } - + SchedulerRequestKey schedulerKey, Container container) { // Inform the application - RMContainer allocatedContainer = - application.allocate(allocationResult.containerNodeType, node, - schedulerKey, lastResourceRequest, container); + RMContainer allocatedContainer = application.allocate(node, schedulerKey, + lastResourceRequest, container); + + allocationResult.updatedContainer = allocatedContainer; // Does the application need this resource? if (allocatedContainer == null) { @@ -710,13 +714,6 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED); return ret; } - - // Inform the node - node.allocateContainer(allocatedContainer); - - // update locality statistics - application.incNumAllocatedContainers(allocationResult.containerNodeType, - allocationResult.requestNodeType); return allocationResult; } @@ -743,14 +740,18 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { // When allocating container - allocationResult = - handleNewContainerAllocation(allocationResult, node, schedulerKey, - reservedContainer, container); + allocationResult = handleNewContainerAllocation(allocationResult, node, + schedulerKey, container); } else { // When reserving container - application.reserve(schedulerKey, node, reservedContainer, container); + RMContainer updatedContainer = reservedContainer; + if (updatedContainer == null) { + updatedContainer = new RMContainerImpl(container, + application.getApplicationAttemptId(), node.getNodeID(), + application.getAppSchedulingInfo().getUser(), rmContext); + } + allocationResult.updatedContainer = updatedContainer; } - allocationResult.updatedContainer = container; // Only reset opportunities when we FIRST allocate the container. (IAW, When // reservedContainer != null, it's not the first time) @@ -788,16 +789,46 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { } private ContainerAllocation allocate(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey, RMContainer reservedContainer) { - ContainerAllocation result = - preAllocation(clusterResource, node, schedulingMode, resourceLimits, - schedulerKey, reservedContainer); + // Do checks before determining which node to allocate + // Directly return if this check fails. + ContainerAllocation result; + if (reservedContainer == null) { + result = preCheckForPlacementSet(clusterResource, ps, schedulingMode, + resourceLimits, schedulerKey); + if (null != result) { + return result; + } + } else { + // pre-check when allocating reserved container + if (application.getTotalRequiredResources(schedulerKey) == 0) { + // Release + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + } + + SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS = + application.getAppSchedulingInfo().getSchedulingPlacementSet( + schedulerKey); + + result = ContainerAllocation.PRIORITY_SKIPPED; - if (AllocationState.ALLOCATED == result.state - || AllocationState.RESERVED == result.state) { - result = doAllocation(result, node, schedulerKey, reservedContainer); + Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator( + ps); + while (iter.hasNext()) { + FiCaSchedulerNode node = iter.next(); + + result = tryAllocateOnNode(clusterResource, node, schedulingMode, + resourceLimits, schedulerKey, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = doAllocation(result, node, schedulerKey, reservedContainer); + break; + } } return result; @@ -805,17 +836,19 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, + PlacementSet<FiCaSchedulerNode> ps, SchedulingMode schedulingMode, ResourceLimits resourceLimits, RMContainer reservedContainer) { + FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps); + if (reservedContainer == null) { // Check if application needs more resource, skip if it doesn't need more. if (!application.hasPendingResourceRequest(rc, - node.getPartition(), clusterResource, schedulingMode)) { + ps.getPartition(), clusterResource, schedulingMode)) { if (LOG.isDebugEnabled()) { LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); + + schedulingMode.name() + " node-label=" + ps.getPartition()); } ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( activitiesManager, node, application, application.getPriority(), @@ -826,7 +859,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { // Schedule in priority order for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) { ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, + allocate(clusterResource, ps, schedulingMode, resourceLimits, schedulerKey, null); AllocationState allocationState = result.getAllocationState(); @@ -845,7 +878,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator { return CSAssignment.SKIP_ASSIGNMENT; } else { ContainerAllocation result = - allocate(clusterResource, node, schedulingMode, resourceLimits, + allocate(clusterResource, ps, schedulingMode, resourceLimits, reservedContainer.getReservedSchedulerKey(), reservedContainer); return getCSAssignmentFromAllocateResult(clusterResource, result, reservedContainer, node); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index aad3bc7..63d8a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -38,11 +38,13 @@ public class AssignmentInformation { } public static class AssignmentDetails { + public RMContainer rmContainer; public ContainerId containerId; public String queue; - public AssignmentDetails(ContainerId containerId, String queue) { - this.containerId = containerId; + public AssignmentDetails(RMContainer rmContainer, String queue) { + this.containerId = rmContainer.getContainerId(); + this.rmContainer = rmContainer; this.queue = queue; } } @@ -58,7 +60,7 @@ public class AssignmentInformation { for (Operation op : Operation.values()) { operationCounts.put(op, 0); operationResources.put(op, Resource.newInstance(0, 0)); - operationDetails.put(op, new ArrayList<AssignmentDetails>()); + operationDetails.put(op, new ArrayList<>()); } } @@ -98,17 +100,17 @@ public class AssignmentInformation { return operationResources.get(Operation.RESERVATION); } - private void addAssignmentDetails(Operation op, ContainerId containerId, + private void addAssignmentDetails(Operation op, RMContainer rmContainer, String queue) { - operationDetails.get(op).add(new AssignmentDetails(containerId, queue)); + operationDetails.get(op).add(new AssignmentDetails(rmContainer, queue)); } - public void addAllocationDetails(ContainerId containerId, String queue) { - addAssignmentDetails(Operation.ALLOCATION, containerId, queue); + public void addAllocationDetails(RMContainer rmContainer, String queue) { + addAssignmentDetails(Operation.ALLOCATION, rmContainer, queue); } - public void addReservationDetails(ContainerId containerId, String queue) { - addAssignmentDetails(Operation.RESERVATION, containerId, queue); + public void addReservationDetails(RMContainer rmContainer, String queue) { + addAssignmentDetails(Operation.RESERVATION, rmContainer, queue); } public List<AssignmentDetails> getAllocationDetails() { @@ -119,23 +121,31 @@ public class AssignmentInformation { return operationDetails.get(Operation.RESERVATION); } - private ContainerId getFirstContainerIdFromOperation(Operation op) { + private RMContainer getFirstRMContainerFromOperation(Operation op) { if (null != operationDetails.get(op)) { List<AssignmentDetails> assignDetails = operationDetails.get(op); if (!assignDetails.isEmpty()) { - return assignDetails.get(0).containerId; + return assignDetails.get(0).rmContainer; } } return null; } + public RMContainer getFirstAllocatedOrReservedRMContainer() { + RMContainer rmContainer; + rmContainer = getFirstRMContainerFromOperation(Operation.ALLOCATION); + if (null != rmContainer) { + return rmContainer; + } + return getFirstRMContainerFromOperation(Operation.RESERVATION); + } + public ContainerId getFirstAllocatedOrReservedContainerId() { - ContainerId containerId; - containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); - if (null != containerId) { - return containerId; + RMContainer rmContainer = getFirstAllocatedOrReservedRMContainer(); + if (null != rmContainer) { + return rmContainer.getContainerId(); } - return getFirstContainerIdFromOperation(Operation.RESERVATION); + return null; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java new file mode 100644 index 0000000..ac83d6f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ContainerAllocationProposal.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; + +import java.util.Collections; +import java.util.List; + +/** + * Proposal to allocate/reserve a new container + */ +public class ContainerAllocationProposal<A extends SchedulerApplicationAttempt, + N extends SchedulerNode> { + // Container we allocated or reserved + private SchedulerContainer<A, N> allocatedOrReservedContainer; + + // Containers we need to release before allocating or reserving the + // new container + private List<SchedulerContainer<A, N>> toRelease = Collections.emptyList(); + + // When trying to allocate from a reserved container, set this, and this will + // not be included by toRelease list + private SchedulerContainer<A, N> allocateFromReservedContainer; + + private boolean isIncreasedAllocation; + + private NodeType allocationLocalityType; + + private NodeType requestLocalityType; + + private SchedulingMode schedulingMode; + + private Resource allocatedResource; // newly allocated resource + + public ContainerAllocationProposal( + SchedulerContainer<A, N> allocatedOrReservedContainer, + List<SchedulerContainer<A, N>> toRelease, + SchedulerContainer<A, N> allocateFromReservedContainer, + boolean isIncreasedAllocation, NodeType allocationLocalityType, + NodeType requestLocalityType, SchedulingMode schedulingMode, + Resource allocatedResource) { + this.allocatedOrReservedContainer = allocatedOrReservedContainer; + if (null != toRelease) { + this.toRelease = toRelease; + } + this.allocateFromReservedContainer = allocateFromReservedContainer; + this.isIncreasedAllocation = isIncreasedAllocation; + this.allocationLocalityType = allocationLocalityType; + this.requestLocalityType = requestLocalityType; + this.schedulingMode = schedulingMode; + this.allocatedResource = allocatedResource; + } + + public SchedulingMode getSchedulingMode() { + return schedulingMode; + } + + public Resource getAllocatedOrReservedResource() { + return allocatedResource; + } + + public NodeType getAllocationLocalityType() { + return allocationLocalityType; + } + + public boolean isIncreasedAllocation() { + return isIncreasedAllocation; + } + + public SchedulerContainer<A, N> getAllocateFromReservedContainer() { + return allocateFromReservedContainer; + } + + public SchedulerContainer<A, N> getAllocatedOrReservedContainer() { + return allocatedOrReservedContainer; + } + + public List<SchedulerContainer<A, N>> getToRelease() { + return toRelease; + } + + @Override + public String toString() { + return allocatedOrReservedContainer.toString(); + } + + public NodeType getRequestLocalityType() { + return requestLocalityType; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java new file mode 100644 index 0000000..bdea97d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * Scheduler should implement this interface if it wants to have multi-threading + * plus global scheduling functionality + */ +public interface ResourceAllocationCommitter { + void tryCommit(Resource cluster, ResourceCommitRequest proposal); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java new file mode 100644 index 0000000..5aca202 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceCommitRequest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collections; +import java.util.List; + +public class ResourceCommitRequest<A extends SchedulerApplicationAttempt, + N extends SchedulerNode> { + // New containers to be allocated + private List<ContainerAllocationProposal<A, N>> containersToAllocate = + Collections.emptyList(); + + // New containers to be released + private List<ContainerAllocationProposal<A, N>> containersToReserve = + Collections.emptyList(); + + // We don't need these containers anymore + private List<SchedulerContainer<A, N>> toReleaseContainers = + Collections.emptyList(); + + private Resource totalAllocatedResource; + private Resource totalReservedResource; + private Resource totalReleasedResource; + + public ResourceCommitRequest( + List<ContainerAllocationProposal<A, N>> containersToAllocate, + List<ContainerAllocationProposal<A, N>> containersToReserve, + List<SchedulerContainer<A, N>> toReleaseContainers) { + if (null != containersToAllocate) { + this.containersToAllocate = containersToAllocate; + } + if (null != containersToReserve) { + this.containersToReserve = containersToReserve; + } + if (null != toReleaseContainers) { + this.toReleaseContainers = toReleaseContainers; + } + + totalAllocatedResource = Resources.createResource(0); + totalReservedResource = Resources.createResource(0); + + /* + * For total-release resource, it has two parts: + * 1) Unconditional release: for example, an app reserved a container, + * but the app doesn't has any pending resource. + * 2) Conditional release: for example, reservation continuous looking, or + * Lazy preemption -- which we need to kill some resource to allocate + * or reserve the new container. + * + * For the 2nd part, it is inside: + * ContainerAllocationProposal#toRelease, which means we will kill/release + * these containers to allocate/reserve the given container. + * + * So we need to account both of conditional/unconditional to-release + * containers to the total release-able resource. + */ + totalReleasedResource = Resources.createResource(0); + + for (ContainerAllocationProposal<A,N> c : this.containersToAllocate) { + Resources.addTo(totalAllocatedResource, + c.getAllocatedOrReservedResource()); + for (SchedulerContainer<A,N> r : c.getToRelease()) { + Resources.addTo(totalReleasedResource, + r.getRmContainer().getAllocatedOrReservedResource()); + } + } + + for (ContainerAllocationProposal<A,N> c : this.containersToReserve) { + Resources.addTo(totalReservedResource, + c.getAllocatedOrReservedResource()); + for (SchedulerContainer<A,N> r : c.getToRelease()) { + Resources.addTo(totalReleasedResource, + r.getRmContainer().getAllocatedOrReservedResource()); + } + } + + for (SchedulerContainer<A,N> r : this.toReleaseContainers) { + Resources.addTo(totalReleasedResource, + r.getRmContainer().getAllocatedOrReservedResource()); + } + } + + public List<ContainerAllocationProposal<A, N>> getContainersToAllocate() { + return containersToAllocate; + } + + public List<ContainerAllocationProposal<A, N>> getContainersToReserve() { + return containersToReserve; + } + + public List<SchedulerContainer<A, N>> getContainersToRelease() { + return toReleaseContainers; + } + + public Resource getTotalAllocatedResource() { + return totalAllocatedResource; + } + + public Resource getTotalReservedResource() { + return totalReservedResource; + } + + public Resource getTotalReleasedResource() { + return totalReleasedResource; + } + + /* + * Util functions to make your life easier + */ + public boolean anythingAllocatedOrReserved() { + return (!containersToAllocate.isEmpty()) || (!containersToReserve + .isEmpty()); + } + + public ContainerAllocationProposal<A, N> getFirstAllocatedOrReservedContainer() { + ContainerAllocationProposal<A, N> c = null; + if (!containersToAllocate.isEmpty()) { + c = containersToAllocate.get(0); + } + if (c == null && !containersToReserve.isEmpty()) { + c = containersToReserve.get(0); + } + + return c; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("New " + getClass().getName() + ":" + "\n"); + if (null != containersToAllocate && !containersToAllocate.isEmpty()) { + sb.append("\t ALLOCATED=" + containersToAllocate.toString()); + } + if (null != containersToReserve && !containersToReserve.isEmpty()) { + sb.append("\t RESERVED=" + containersToReserve.toString()); + } + if (null != toReleaseContainers && !toReleaseContainers.isEmpty()) { + sb.append("\t RELEASED=" + toReleaseContainers.toString()); + } + return sb.toString(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
