http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java new file mode 100644 index 0000000..8b4907b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +/** + * Contexts for a container inside scheduler + */ +public class SchedulerContainer<A extends SchedulerApplicationAttempt, + N extends SchedulerNode> { + private RMContainer rmContainer; + private String nodePartition; + private A schedulerApplicationAttempt; + private N schedulerNode; + private boolean allocated; // Allocated (True) or reserved (False) + + public SchedulerContainer(A app, N node, RMContainer rmContainer, + String nodePartition, boolean allocated) { + this.schedulerApplicationAttempt = app; + this.schedulerNode = node; + this.rmContainer = rmContainer; + this.nodePartition = nodePartition; + this.allocated = allocated; + } + + public String getNodePartition() { + return nodePartition; + } + + public RMContainer getRmContainer() { + return rmContainer; + } + + public A getSchedulerApplicationAttempt() { + return schedulerApplicationAttempt; + } + + public N getSchedulerNode() { + return schedulerNode; + } + + public boolean isAllocated() { + return allocated; + } + + public SchedulerRequestKey getSchedulerRequestKey() { + if (rmContainer.getState() == RMContainerState.RESERVED) { + return rmContainer.getReservedSchedulerKey(); + } + return rmContainer.getAllocatedSchedulerKey(); + } + + @Override + public String toString() { + return "(Application=" + schedulerApplicationAttempt + .getApplicationAttemptId() + "; Node=" + schedulerNode.getNodeID() + + "; Resource=" + rmContainer.getAllocatedOrReservedResource() + ")"; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index ebe70d4..6d9dda8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,14 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -51,12 +44,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -70,11 +64,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCap import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -101,6 +109,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { */ private String appSkipNodeDiagnostics; + private Map<ContainerId, SchedContainerChangeRequest> toBeRemovedIncRequests = + new ConcurrentHashMap<>(); + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { @@ -193,11 +204,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } } - public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + public RMContainer allocate(FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, Container container) { try { - writeLock.lock(); + readLock.lock(); if (isStopped) { return null; @@ -216,41 +227,408 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { request.getNodeLabelExpression()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); + // FIXME, should set when confirmed updateAMContainerDiagnostics(AMState.ASSIGNED, null); - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + return rmContainer; + } finally { + readLock.unlock(); + } + } - ContainerId containerId = container.getId(); - liveContainers.put(containerId, rmContainer); + private boolean rmContainerInFinalState(RMContainer rmContainer) { + if (null == rmContainer) { + return false; + } - // Update consumption and track allocations - List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); + return rmContainer.completed(); + } - attemptResourceUsage.incUsed(node.getPartition(), - container.getResource()); + private boolean anyContainerInFinalState( + ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { + for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request + .getContainersToRelease()) { + if (rmContainerInFinalState(c.getRmContainer())) { + if (LOG.isDebugEnabled()) { + LOG.debug("To-release container=" + c.getRmContainer() + + " is in final state"); + } + return true; + } + } - // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request + .getContainersToAllocate()) { + for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c + .getToRelease()) { + if (rmContainerInFinalState(r.getRmContainer())) { + if (LOG.isDebugEnabled()) { + LOG.debug("To-release container=" + r.getRmContainer() + + ", for to a new allocated container, is in final state"); + } + return true; + } + } - // Inform the container - rmContainer.handle( - new RMContainerEvent(containerId, RMContainerEventType.START)); + if (null != c.getAllocateFromReservedContainer()) { + if (rmContainerInFinalState( + c.getAllocateFromReservedContainer().getRmContainer())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Allocate from reserved container" + c + .getAllocateFromReservedContainer().getRmContainer() + + " is in final state"); + } + return true; + } + } + } + for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c : request + .getContainersToReserve()) { + for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> r : c + .getToRelease()) { + if (rmContainerInFinalState(r.getRmContainer())) { + if (LOG.isDebugEnabled()) { + LOG.debug("To-release container=" + r.getRmContainer() + + ", for a reserved container, is in final state"); + } + return true; + } + } + } + + return false; + } + + private SchedContainerChangeRequest getResourceChangeRequest( + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { + return appSchedulingInfo.getIncreaseRequest( + schedulerContainer.getSchedulerNode().getNodeID(), + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getRmContainer().getContainerId()); + } + + private boolean checkIncreaseContainerAllocation( + ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation, + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { + // When increase a container + if (schedulerContainer.getRmContainer().getState() + != RMContainerState.RUNNING) { if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" + containerId - .getApplicationAttemptId() + " container=" + containerId + " host=" - + container.getNodeId().getHost() + " type=" + type); + LOG.debug("Trying to increase a container, but container=" + + schedulerContainer.getRmContainer().getContainerId() + + " is not in running state."); } - RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, - "SchedulerApp", getApplicationId(), containerId, - container.getResource()); + return false; + } - return rmContainer; + // Check if increase request is still valid + SchedContainerChangeRequest increaseRequest = getResourceChangeRequest( + schedulerContainer); + + if (null == increaseRequest || !Resources.equals( + increaseRequest.getDeltaCapacity(), + allocation.getAllocatedOrReservedResource())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Increase request has been changed, reject this proposal"); + } + return false; + } + + if (allocation.getAllocateFromReservedContainer() != null) { + // In addition, if allocation is from a reserved container, check + // if the reserved container has enough reserved space + if (!Resources.equals( + allocation.getAllocateFromReservedContainer().getRmContainer() + .getReservedResource(), increaseRequest.getDeltaCapacity())) { + return false; + } + } + + return true; + } + + private boolean commonCheckContainerAllocation( + Resource cluster, + ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation, + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) { + // Make sure node is not reserved by anyone else + RMContainer reservedContainerOnNode = + schedulerContainer.getSchedulerNode().getReservedContainer(); + if (reservedContainerOnNode != null) { + RMContainer fromReservedContainer = + allocation.getAllocateFromReservedContainer().getRmContainer(); + + if (fromReservedContainer != reservedContainerOnNode) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Try to allocate from a non-existed reserved container"); + } + return false; + } + } + + // Do we have enough space on this node? + Resource availableResource = Resources.clone( + schedulerContainer.getSchedulerNode().getUnallocatedResource()); + + // If we have any to-release container in non-reserved state, they are + // from lazy-preemption, add their consumption to available resource + // of this node + if (allocation.getToRelease() != null && !allocation.getToRelease() + .isEmpty()) { + for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> + releaseContainer : allocation.getToRelease()) { + // Only consider non-reserved container (reserved container will + // not affect available resource of node) on the same node + if (releaseContainer.getRmContainer().getState() + != RMContainerState.RESERVED + && releaseContainer.getSchedulerNode() == schedulerContainer + .getSchedulerNode()) { + Resources.addTo(availableResource, + releaseContainer.getRmContainer().getAllocatedResource()); + } + } + } + if (!Resources.fitsIn(rc, cluster, + allocation.getAllocatedOrReservedResource(), + availableResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node doesn't have enough available resource, asked=" + + allocation.getAllocatedOrReservedResource() + " available=" + + availableResource); + } + return false; + } + + return true; + } + + public boolean accept(Resource cluster, + ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { + List<ResourceRequest> resourceRequests = null; + boolean reReservation = false; + + try { + readLock.lock(); + + // First make sure no container in release list in final state + if (anyContainerInFinalState(request)) { + return false; + } + + // TODO, make sure all scheduler nodes are existed + // TODO, make sure all node labels are not changed + + if (request.anythingAllocatedOrReserved()) { + /* + * 1) If this is a newly allocated container, check if the node is reserved + * / not-reserved by any other application + * 2) If this is a newly reserved container, check if the node is reserved or not + */ + // Assume we have only one container allocated or reserved + ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> + allocation = request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> + schedulerContainer = allocation.getAllocatedOrReservedContainer(); + + if (schedulerContainer.isAllocated()) { + if (!allocation.isIncreasedAllocation()) { + // When allocate a new container + resourceRequests = + schedulerContainer.getRmContainer().getResourceRequests(); + + // Check pending resource request + if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey())) { + if (LOG.isDebugEnabled()) { + LOG.debug("No pending resource for: nodeType=" + allocation + .getAllocationLocalityType() + ", node=" + schedulerContainer + .getSchedulerNode() + ", requestKey=" + schedulerContainer + .getSchedulerRequestKey() + ", application=" + + getApplicationAttemptId()); + } + + return false; + } + } else { + if (!checkIncreaseContainerAllocation(allocation, + schedulerContainer)) { + return false; + } + } + + // Common part of check container allocation regardless if it is a + // increase container or regular container + commonCheckContainerAllocation(cluster, allocation, + schedulerContainer); + } else { + // Container reserved first time will be NEW, after the container + // accepted & confirmed, it will become RESERVED state + if (schedulerContainer.getRmContainer().getState() + == RMContainerState.RESERVED) { + // Set reReservation == true + reReservation = true; + } else { + // When reserve a resource (state == NEW is for new container, + // state == RUNNING is for increase container). + // Just check if the node is not already reserved by someone + if (schedulerContainer.getSchedulerNode().getReservedContainer() + != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Try to reserve a container, but the node is " + + "already reserved by another container=" + + schedulerContainer.getSchedulerNode() + .getReservedContainer().getContainerId()); + } + return false; + } + } + } + } + } finally { + readLock.unlock(); + } + + // Skip check parent if this is a re-reservation container + boolean accepted = true; + if (!reReservation) { + // Check parent if anything allocated or reserved + if (request.anythingAllocatedOrReserved()) { + accepted = getCSLeafQueue().accept(cluster, request); + } + } + + // When rejected, recover resource requests for this app + if (!accepted && resourceRequests != null) { + recoverResourceRequestsForContainer(resourceRequests); + } + + return accepted; + } + + public void apply(Resource cluster, + ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { + boolean reReservation = false; + + try { + writeLock.lock(); + + // If we allocated something + if (request.anythingAllocatedOrReserved()) { + ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> + allocation = request.getFirstAllocatedOrReservedContainer(); + SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> + schedulerContainer = allocation.getAllocatedOrReservedContainer(); + RMContainer rmContainer = schedulerContainer.getRmContainer(); + + reReservation = + (!schedulerContainer.isAllocated()) && (rmContainer.getState() + == RMContainerState.RESERVED); + + // Generate new containerId if it is not an allocation for increasing + // Or re-reservation + if (!allocation.isIncreasedAllocation()) { + if (rmContainer.getContainer().getId() == null) { + rmContainer.setContainerId(BuilderUtils + .newContainerId(getApplicationAttemptId(), + getNewContainerId())); + } + } + ContainerId containerId = rmContainer.getContainerId(); + + if (schedulerContainer.isAllocated()) { + // This allocation is from a reserved container + // Unreserve it first + if (allocation.getAllocateFromReservedContainer() != null) { + RMContainer reservedContainer = + allocation.getAllocateFromReservedContainer().getRmContainer(); + // Handling container allocation + // Did we previously reserve containers at this 'priority'? + unreserve(schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getSchedulerNode(), reservedContainer); + } + + // Update this application for the allocated container + if (!allocation.isIncreasedAllocation()) { + // Allocate a new container + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(containerId, rmContainer); + + // Deduct pending resource requests + List<ResourceRequest> requests = appSchedulingInfo.allocate( + allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getRmContainer().getContainer()); + ((RMContainerImpl) rmContainer).setResourceRequests(requests); + + attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), + allocation.getAllocatedOrReservedResource()); + + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.START)); + + // Inform the node + schedulerContainer.getSchedulerNode().allocateContainer( + rmContainer); + + // update locality statistics, + incNumAllocatedContainers(allocation.getAllocationLocalityType(), + allocation.getRequestLocalityType()); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + containerId + .getApplicationAttemptId() + " container=" + containerId + + " host=" + rmContainer.getAllocatedNode().getHost() + + " type=" + allocation.getAllocationLocalityType()); + } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, + allocation.getAllocatedOrReservedResource()); + } else{ + SchedContainerChangeRequest increaseRequest = + getResourceChangeRequest(schedulerContainer); + + // allocate resource for an increase request + // Notify node + schedulerContainer.getSchedulerNode().increaseContainer( + increaseRequest.getContainerId(), + increaseRequest.getDeltaCapacity()); + + // OK, we can allocate this increase request + // Notify application + increaseContainer(increaseRequest); + } + } else { + if (!allocation.isIncreasedAllocation()) { + // If the rmContainer's state is already updated to RESERVED, this is + // a reReservation + reserve(schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getRmContainer(), + schedulerContainer.getRmContainer().getContainer(), + reReservation); + } else{ + SchedContainerChangeRequest increaseRequest = + getResourceChangeRequest(schedulerContainer); + + reserveIncreasedContainer( + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getSchedulerNode(), + increaseRequest.getRMContainer(), + increaseRequest.getDeltaCapacity()); + } + } + } } finally { writeLock.unlock(); } + + // Don't bother CS leaf queue if it is a re-reservation + if (!reReservation) { + getCSLeafQueue().apply(cluster, request); + } } public boolean unreserve(SchedulerRequestKey schedulerKey, @@ -347,9 +725,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { * of the resources that will be allocated to and preempted from this * application. * - * @param resourceCalculator - * @param clusterResource - * @param minimumAllocation + * @param resourceCalculator resourceCalculator + * @param clusterResource clusterResource + * @param minimumAllocation minimumAllocation * @return an allocation */ public Allocation getAllocation(ResourceCalculator resourceCalculator, @@ -386,45 +764,40 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public NodeId getNodeIdToUnreserve( SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve, ResourceCalculator rc, Resource clusterResource) { - try { - writeLock.lock(); - // first go around make this algorithm simple and just grab first - // reservation that has enough resources - Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get( - schedulerKey); - - if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { - for (Map.Entry<NodeId, RMContainer> entry : reservedContainers - .entrySet()) { - NodeId nodeId = entry.getKey(); - RMContainer reservedContainer = entry.getValue(); - if (reservedContainer.hasIncreaseReservation()) { - // Currently, only regular container allocation supports continuous - // reservation looking, we don't support canceling increase request - // reservation when allocating regular container. - continue; - } - - Resource reservedResource = reservedContainer.getReservedResource(); + // first go around make this algorithm simple and just grab first + // reservation that has enough resources + Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get( + schedulerKey); + + if ((reservedContainers != null) && (!reservedContainers.isEmpty())) { + for (Map.Entry<NodeId, RMContainer> entry : reservedContainers + .entrySet()) { + NodeId nodeId = entry.getKey(); + RMContainer reservedContainer = entry.getValue(); + if (reservedContainer.hasIncreaseReservation()) { + // Currently, only regular container allocation supports continuous + // reservation looking, we don't support canceling increase request + // reservation when allocating regular container. + continue; + } - // make sure we unreserve one with at least the same amount of - // resources, otherwise could affect capacity limits - if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, - reservedResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "unreserving node with reservation size: " + reservedResource - + " in order to allocate container with size: " - + resourceNeedUnreserve); - } - return nodeId; + Resource reservedResource = reservedContainer.getReservedResource(); + + // make sure we unreserve one with at least the same amount of + // resources, otherwise could affect capacity limits + if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve, + reservedResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "unreserving node with reservation size: " + reservedResource + + " in order to allocate container with size: " + + resourceNeedUnreserve); } + return nodeId; } } - return null; - } finally { - writeLock.unlock(); } + return null; } public void setHeadroomProvider( @@ -482,10 +855,11 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return false; } - public void reserve(SchedulerRequestKey schedulerKey, - FiCaSchedulerNode node, RMContainer rmContainer, Container container) { + public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, + RMContainer rmContainer, Container container, boolean reReservation) { // Update reserved metrics if this is the first reservation - if (rmContainer == null) { + // rmContainer will be moved to reserved in the super.reserve + if (!reReservation) { queue.getMetrics().reserveResource( getUser(), container.getResource()); } @@ -501,35 +875,39 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public RMContainer findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) { - // need to unreserve some other container first - NodeId idToUnreserve = - getNodeIdToUnreserve(schedulerKey, minimumUnreservedResource, - rc, clusterResource); - if (idToUnreserve == null) { + try { + readLock.lock(); + // need to unreserve some other container first + NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey, + minimumUnreservedResource, rc, clusterResource); + if (idToUnreserve == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("checked to see if could unreserve for app but nothing " + + "reserved that matches for this app"); + } + return null; + } + FiCaSchedulerNode nodeToUnreserve = + ((CapacityScheduler) scheduler).getNode(idToUnreserve); + if (nodeToUnreserve == null) { + LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); + return null; + } if (LOG.isDebugEnabled()) { - LOG.debug("checked to see if could unreserve for app but nothing " - + "reserved that matches for this app"); + LOG.debug("unreserving for app: " + getApplicationId() + " on nodeId: " + + idToUnreserve + + " in order to replace reserved application and place it on node: " + + node.getNodeID() + " needing: " + minimumUnreservedResource); } - return null; - } - FiCaSchedulerNode nodeToUnreserve = - ((CapacityScheduler) scheduler).getNode(idToUnreserve); - if (nodeToUnreserve == null) { - LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("unreserving for app: " + getApplicationId() - + " on nodeId: " + idToUnreserve - + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + minimumUnreservedResource); - } - // headroom - Resources.addTo(getHeadroom(), nodeToUnreserve - .getReservedContainer().getReservedResource()); + // headroom + Resources.addTo(getHeadroom(), + nodeToUnreserve.getReservedContainer().getReservedResource()); - return nodeToUnreserve.getReservedContainer(); + return nodeToUnreserve.getReservedContainer(); + } finally { + readLock.unlock(); + } } public LeafQueue getCSLeafQueue() { @@ -537,7 +915,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits currentResourceLimits, + PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " @@ -545,13 +923,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { showRequests(); } - try { - writeLock.lock(); - return containerAllocator.assignContainers(clusterResource, node, - schedulingMode, currentResourceLimits, reservedContainer); - } finally { - writeLock.unlock(); - } + return containerAllocator.assignContainers(clusterResource, ps, + schedulingMode, currentResourceLimits, reservedContainer); } public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition, @@ -626,13 +999,18 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { /** * Set the message temporarily if the reason is known for why scheduling did * not happen for a given node, if not message will be over written - * @param message + * @param message Message of app skip diagnostics */ public void updateAppSkipNodeDiagnostics(String message) { this.appSkipNodeDiagnostics = message; } public void updateNodeInfoForAMDiagnostics(FiCaSchedulerNode node) { + // FIXME, update AM diagnostics when global scheduling is enabled + if (null == node) { + return; + } + if (isWaitingForAMContainer()) { StringBuilder diagnosticMessageBldr = new StringBuilder(); if (appSkipNodeDiagnostics != null) { @@ -653,6 +1031,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } } + @Override + @SuppressWarnings("unchecked") + public SchedulingPlacementSet<FiCaSchedulerNode> getSchedulingPlacementSet( + SchedulerRequestKey schedulerRequestKey) { + return super.getSchedulingPlacementSet(schedulerRequestKey); + } + /** * Recalculates the per-app, percent of queue metric, specific to the * Capacity Scheduler. @@ -690,4 +1075,29 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { public ReentrantReadWriteLock.WriteLock getWriteLock() { return this.writeLock; } + + public void addToBeRemovedIncreaseRequest( + SchedContainerChangeRequest request) { + toBeRemovedIncRequests.put(request.getContainerId(), request); + } + + public void removedToBeRemovedIncreaseRequests() { + // Remove invalid in request requests + if (!toBeRemovedIncRequests.isEmpty()) { + try { + writeLock.lock(); + Iterator<Map.Entry<ContainerId, SchedContainerChangeRequest>> iter = + toBeRemovedIncRequests.entrySet().iterator(); + while (iter.hasNext()) { + SchedContainerChangeRequest req = iter.next().getValue(); + appSchedulingInfo.removeIncreaseRequest(req.getNodeId(), + req.getRMContainer().getAllocatedSchedulerKey(), + req.getContainerId()); + iter.remove(); + } + } finally { + writeLock.unlock(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java new file mode 100644 index 0000000..d275bfd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + +import java.util.List; + +public class FifoAppAttempt extends FiCaSchedulerApp { + private static final Log LOG = LogFactory.getLog(FifoAppAttempt.class); + + FifoAppAttempt(ApplicationAttemptId appAttemptId, String user, + Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext) { + super(appAttemptId, user, queue, activeUsersManager, rmContext); + } + + public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + SchedulerRequestKey schedulerKey, ResourceRequest request, + Container container) { + try { + writeLock.lock(); + + if (isStopped) { + return null; + } + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(schedulerKey) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, + this.getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), this.rmContext, + request.getNodeLabelExpression()); + ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); + + updateAMContainerDiagnostics(AMState.ASSIGNED, null); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + + ContainerId containerId = container.getId(); + liveContainers.put(containerId, rmContainer); + + // Update consumption and track allocations + List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( + type, node, schedulerKey, request, container); + + attemptResourceUsage.incUsed(node.getPartition(), + container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(containerId, RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + containerId + .getApplicationAttemptId() + " container=" + containerId + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + RMAuditLogger.AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), containerId, container.getResource()); + + return rmContainer; + } finally { + writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 92acf75..5ccde19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -103,7 +102,7 @@ import com.google.common.annotations.VisibleForTesting; @Evolving @SuppressWarnings("unchecked") public class FifoScheduler extends - AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements + AbstractYarnScheduler<FifoAppAttempt, FiCaSchedulerNode> implements Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -239,7 +238,7 @@ public class FifoScheduler extends validateConf(conf); //Use ConcurrentSkipListMap because applications need to be ordered this.applications = - new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>(); + new ConcurrentSkipListMap<>(); this.minimumAllocation = Resources.createResource(conf.getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, @@ -328,7 +327,7 @@ public class FifoScheduler extends List<String> blacklistAdditions, List<String> blacklistRemovals, List<UpdateContainerRequest> increaseRequests, List<UpdateContainerRequest> decreaseRequests) { - FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); + FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + "or non existant application " + applicationAttemptId); @@ -384,8 +383,8 @@ public class FifoScheduler extends @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { - SchedulerApplication<FiCaSchedulerApp> application = - new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user); + SchedulerApplication<FifoAppAttempt> application = + new SchedulerApplication<>(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user @@ -405,12 +404,12 @@ public class FifoScheduler extends addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { - SchedulerApplication<FiCaSchedulerApp> application = + SchedulerApplication<FifoAppAttempt> application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); // TODO: Fix store - FiCaSchedulerApp schedulerApp = - new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, + FifoAppAttempt schedulerApp = + new FifoAppAttempt(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); if (transferStateFromPreviousAttempt) { @@ -436,7 +435,7 @@ public class FifoScheduler extends private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication<FiCaSchedulerApp> application = + SchedulerApplication<FifoAppAttempt> application = applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); @@ -454,8 +453,8 @@ public class FifoScheduler extends ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) throws IOException { - FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication<FiCaSchedulerApp> application = + FifoAppAttempt attempt = getApplicationAttempt(applicationAttemptId); + SchedulerApplication<FifoAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { throw new IOException("Unknown application " + applicationAttemptId + @@ -492,9 +491,9 @@ public class FifoScheduler extends " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications + for (Map.Entry<ApplicationId, SchedulerApplication<FifoAppAttempt>> e : applications .entrySet()) { - FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt(); + FifoAppAttempt application = e.getValue().getCurrentAppAttempt(); if (application == null) { continue; } @@ -536,9 +535,9 @@ public class FifoScheduler extends // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) { - FiCaSchedulerApp attempt = - (FiCaSchedulerApp) application.getCurrentAppAttempt(); + for (SchedulerApplication<FifoAppAttempt> application : applications.values()) { + FifoAppAttempt attempt = + (FifoAppAttempt) application.getCurrentAppAttempt(); if (attempt == null) { continue; } @@ -546,7 +545,7 @@ public class FifoScheduler extends } } - private int getMaxAllocatableContainers(FiCaSchedulerApp application, + private int getMaxAllocatableContainers(FifoAppAttempt application, SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, NodeType type) { int maxContainers = 0; @@ -585,7 +584,7 @@ public class FifoScheduler extends private int assignContainersOnNode(FiCaSchedulerNode node, - FiCaSchedulerApp application, SchedulerRequestKey schedulerKey + FifoAppAttempt application, SchedulerRequestKey schedulerKey ) { // Data-local int nodeLocalContainers = @@ -612,7 +611,7 @@ public class FifoScheduler extends } private int assignNodeLocalContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { + FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(schedulerKey, node.getNodeName()); @@ -638,7 +637,7 @@ public class FifoScheduler extends } private int assignRackLocalContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { + FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(schedulerKey, node.getRMNode() @@ -664,7 +663,7 @@ public class FifoScheduler extends } private int assignOffSwitchContainers(FiCaSchedulerNode node, - FiCaSchedulerApp application, SchedulerRequestKey schedulerKey) { + FifoAppAttempt application, SchedulerRequestKey schedulerKey) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(schedulerKey, ResourceRequest.ANY); @@ -676,7 +675,7 @@ public class FifoScheduler extends return assignedContainers; } - private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, + private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application, SchedulerRequestKey schedulerKey, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + @@ -710,8 +709,8 @@ public class FifoScheduler extends // Allocate! // Inform the application - RMContainer rmContainer = - application.allocate(type, node, schedulerKey, request, container); + RMContainer rmContainer = application.allocate(type, node, schedulerKey, + request, container); // Inform the node node.allocateContainer(rmContainer); @@ -836,7 +835,7 @@ public class FifoScheduler extends // Get the application for the finished container Container container = rmContainer.getContainer(); - FiCaSchedulerApp application = + FifoAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -916,7 +915,7 @@ public class FifoScheduler extends @Override public RMContainer getRMContainer(ContainerId containerId) { - FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); + FifoAppAttempt attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } @@ -937,7 +936,7 @@ public class FifoScheduler extends if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(applications.size()); - for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) { + for (SchedulerApplication<FifoAppAttempt> app : applications.values()) { attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); } return attempts; http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java new file mode 100644 index 0000000..2e6c3ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSet.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +import java.util.Iterator; +import java.util.Map; + +/** + * <p> + * PlacementSet is the central place that decide the order of node to fit + * asks by application. + * </p> + * + * <p> + * Also, PlacementSet can cache results (for example, ordered list) for + * better performance. + * </p> + * + * <p> + * PlacementSet can depend on one or more other PlacementSets. + * </p> + */ [email protected] [email protected] +public interface PlacementSet<N extends SchedulerNode> { + /** + * Get all nodes for this PlacementSet + * @return all nodes for this PlacementSet + */ + Map<NodeId, N> getAllNodes(); + + /** + * Version of the PlacementSet, can help other PlacementSet with dependencies + * deciding if update is required + * @return version + */ + long getVersion(); + + /** + * Partition of the PlacementSet. + * @return node partition + */ + String getPartition(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java new file mode 100644 index 0000000..405122b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PlacementSetUtils.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class PlacementSetUtils { + /* + * If the {@link PlacementSet} only has one entry, return it. otherwise + * return null + */ + public static <N extends SchedulerNode> N getSingleNode(PlacementSet<N> ps) { + N node = null; + if (1 == ps.getAllNodes().size()) { + node = ps.getAllNodes().values().iterator().next(); + } + + return node; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java new file mode 100644 index 0000000..da356f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/ResourceRequestUpdateResult.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; + +/** + * Result of ResourceRequest update + */ +public class ResourceRequestUpdateResult { + private final ResourceRequest lastAnyResourceRequest; + private final ResourceRequest newResourceRequest; + + public ResourceRequestUpdateResult(ResourceRequest lastAnyResourceRequest, + ResourceRequest newResourceRequest) { + this.lastAnyResourceRequest = lastAnyResourceRequest; + this.newResourceRequest = newResourceRequest; + } + + public ResourceRequest getLastAnyResourceRequest() { + return lastAnyResourceRequest; + } + + public ResourceRequest getNewResourceRequest() { + return newResourceRequest; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java new file mode 100644 index 0000000..f87f764 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * <p> + * In addition to {@link PlacementSet}, this also maintains + * pending ResourceRequests: + * - When new ResourceRequest(s) added to scheduler, or, + * - Or new container allocated, scheduler can notify corresponding + * PlacementSet. + * </p> + * + * <p> + * Different set of resource requests (E.g., resource requests with the + * same schedulerKey) can have one instance of PlacementSet, each PlacementSet + * can have different ways to order nodes depends on requests. + * </p> + */ +public interface SchedulingPlacementSet<N extends SchedulerNode> + extends PlacementSet<N> { + /** + * Get iterator of preferred node depends on requirement and/or availability + * @param clusterPlacementSet input cluster PlacementSet + * @return iterator of preferred node + */ + Iterator<N> getPreferredNodeIterator(PlacementSet<N> clusterPlacementSet); + + /** + * Replace existing ResourceRequest by the new requests + * + * @param requests new ResourceRequests + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + ResourceRequestUpdateResult updateResourceRequests( + List<ResourceRequest> requests, + boolean recoverPreemptedRequestForAContainer); + + /** + * Get pending ResourceRequests by given schedulerRequestKey + * @return Map of resourceName to ResourceRequest + */ + Map<String, ResourceRequest> getResourceRequests(); + + /** + * Get ResourceRequest by given schedulerKey and resourceName + * @param resourceName resourceName + * @param schedulerRequestKey schedulerRequestKey + * @return ResourceRequest + */ + ResourceRequest getResourceRequest(String resourceName, + SchedulerRequestKey schedulerRequestKey); + + /** + * Notify container allocated. + * @param type Type of the allocation + * @param node Which node this container allocated on + * @param request resource request + * @return list of ResourceRequests deducted + */ + List<ResourceRequest> allocate(NodeType type, SchedulerNode node, + ResourceRequest request); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java new file mode 100644 index 0000000..48efaa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SimplePlacementSet.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +/** + * A simple PlacementSet which keeps an unordered map + */ +public class SimplePlacementSet<N extends SchedulerNode> + implements PlacementSet<N> { + + private Map<NodeId, N> map; + private String partition; + + public SimplePlacementSet(N node) { + if (null != node) { + // Only one node in the initial PlacementSet + this.map = ImmutableMap.of(node.getNodeID(), node); + this.partition = node.getPartition(); + } else { + this.map = Collections.emptyMap(); + this.partition = NodeLabel.DEFAULT_NODE_LABEL_PARTITION; + } + } + + public SimplePlacementSet(Map<NodeId, N> map, String partition) { + this.map = map; + this.partition = partition; + } + + @Override + public Map<NodeId, N> getAllNodes() { + return map; + } + + @Override + public long getVersion() { + return 0L; + } + + @Override + public String getPartition() { + return partition; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java index 7bec03a..b7cb1bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -35,7 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti private static final Log LOG = LogFactory.getLog(OrderingPolicy.class); - protected TreeSet<S> schedulableEntities; + protected ConcurrentSkipListSet<S> schedulableEntities; protected Comparator<SchedulableEntity> comparator; protected Map<String, S> entitiesToReorder = new HashMap<String, S>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 3cfcd7a..3371df8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; import com.google.common.annotations.VisibleForTesting; @@ -61,7 +62,7 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom comparators ); this.comparator = fairComparator; - this.schedulableEntities = new TreeSet<S>(comparator); + this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator); } private double getMagnitude(SchedulableEntity r) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java index 10f8eeb..2d066bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; + import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** @@ -32,7 +34,7 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom comparators.add(new PriorityComparator()); comparators.add(new FifoComparator()); this.comparator = new CompoundComparator(comparators); - this.schedulableEntities = new TreeSet<S>(comparator); + this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java index 0891289..6ced9e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicyForPendingApps.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; import java.util.*; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import java.util.concurrent.ConcurrentSkipListSet; /** * This ordering policy is used for pending applications only. @@ -46,7 +47,7 @@ public class FifoOrderingPolicyForPendingApps<S extends SchedulableEntity> comparators.add(new PriorityComparator()); comparators.add(new FifoComparator()); this.comparator = new CompoundComparator(comparators); - this.schedulableEntities = new TreeSet<S>(comparator); + this.schedulableEntities = new ConcurrentSkipListSet<S>(comparator); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 58bb721..3861624 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -167,6 +168,28 @@ public class MockRM extends ResourceManager { } } + private void waitForState(ApplicationId appId, EnumSet<RMAppState> finalStates) + throws InterruptedException { + RMApp app = getRMContext().getRMApps().get(appId); + Assert.assertNotNull("app shouldn't be null", app); + final int timeoutMsecs = 80 * SECOND; + int timeWaiting = 0; + while (!finalStates.contains(app.getState())) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + LOG.info("App : " + appId + " State is : " + app.getState() + + " Waiting for state : " + finalStates); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + } + + LOG.info("App State is : " + app.getState()); + Assert.assertTrue("App State is not correct (timeout).", + finalStates.contains(app.getState())); + } + /** * Wait until an application has reached a specified state. * The timeout is 80 seconds. @@ -254,7 +277,7 @@ public class MockRM extends ResourceManager { RMAppAttemptState finalState, int timeoutMsecs) throws InterruptedException { int timeWaiting = 0; - while (!finalState.equals(attempt.getAppAttemptState())) { + while (finalState != attempt.getAppAttemptState()) { if (timeWaiting >= timeoutMsecs) { break; } @@ -267,7 +290,7 @@ public class MockRM extends ResourceManager { LOG.info("Attempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("Attempt state is not correct (timeout).", finalState, - attempt.getState()); + attempt.getState()); } public void waitForContainerToComplete(RMAppAttempt attempt, @@ -966,6 +989,26 @@ public class MockRM extends ResourceManager { rm.getResourceScheduler()).getApplicationAttempt(attemptId)); } + public static MockAM launchAMWhenAsyncSchedulingEnabled(RMApp app, MockRM rm) + throws Exception { + int i = 0; + while (app.getCurrentAppAttempt() == null) { + if (i < 100) { + i++; + } + Thread.sleep(50); + } + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + + rm.waitForState(attempt.getAppAttemptId(), + RMAppAttemptState.ALLOCATED); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + + return am; + } + /** * NOTE: nm.nodeHeartbeat is explicitly invoked, * don't invoke it before calling launchAM http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 884e236..e48d9d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1091,7 +1091,7 @@ public class TestClientRMService { rmContext.getScheduler().getSchedulerAppInfo(attemptId) .getLiveContainers()).thenReturn(rmContainers); ContainerStatus cs = mock(ContainerStatus.class); - when(containerimpl.getFinishedStatus()).thenReturn(cs); + when(containerimpl.completed()).thenReturn(false); when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A"); when(containerimpl.getContainerExitStatus()).thenReturn(0); when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index 56d38fb..83a354d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -238,8 +238,10 @@ public class TestSchedulerHealth { SchedulerHealth sh = ((CapacityScheduler) resourceManager.getResourceScheduler()) .getSchedulerHealth(); - Assert.assertEquals(2, sh.getAllocationCount().longValue()); - Assert.assertEquals(Resource.newInstance(3 * 1024, 2), + // Now SchedulerHealth records last container allocated, aggregated + // allocation account will not be changed + Assert.assertEquals(1, sh.getAllocationCount().longValue()); + Assert.assertEquals(Resource.newInstance(1 * 1024, 1), sh.getResourcesAllocated()); Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue()); Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails() http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 865449f..0aeedce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -3453,7 +3454,7 @@ public class TestCapacityScheduler { scheduler.handle(new NodeRemovedSchedulerEvent( rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); // schedulerNode is removed, try allocate a container - scheduler.allocateContainersToNode(node); + scheduler.allocateContainersToNode(new SimplePlacementSet<>(node), true); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( @@ -3699,4 +3700,57 @@ public class TestCapacityScheduler { cs.handle(addAttemptEvent1); return appAttemptId1; } + + @Test + public void testAppAttemptLocalityStatistics() throws Exception { + Configuration conf = + TestUtils.getConfigurationWithMultipleQueues(new Configuration(false)); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + + final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(conf); + + MockRM rm = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.start(); + MockNM nm1 = + new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + // Launch app1 in queue=a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a"); + + // Got one offswitch request and offswitch allocation + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // am1 asks for 1 GB resource on h1/default-rack/offswitch + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 2), ResourceRequest + .newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 1)), null); + + CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler(); + + // Got one nodelocal request and nodelocal allocation + cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId())); + + // Got one nodelocal request and racklocal allocation + cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId())); + + RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get( + app1.getApplicationId()).getCurrentAppAttempt() + .getRMAppAttemptMetrics(); + + // We should get one node-local allocation, one rack-local allocation + // And one off-switch allocation + Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } }, + attemptMetrics.getLocalityStatistics()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
