http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4c34c47..a419cc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -19,7 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -37,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -51,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -58,13 +69,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -87,7 +100,7 @@ public abstract class AbstractYarnScheduler protected Resource clusterResource = Resource.newInstance(0, 0); protected Resource minimumAllocation; - private Resource maximumAllocation; + protected Resource maximumAllocation; private Resource configuredMaximumAllocation; private int maxNodeMemory = -1; private int maxNodeVCores = -1; @@ -231,6 +244,55 @@ public abstract class AbstractYarnScheduler application.containerLaunchedOnNode(containerId, node.getNodeID()); } + + protected synchronized void containerIncreasedOnNode(ContainerId containerId, + SchedulerNode node, Container increasedContainerReportedByNM) { + // Get the application for the finished container + SchedulerApplicationAttempt application = + getCurrentAttemptForContainer(containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " increased container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } + + RMContainer rmContainer = getRMContainer(containerId); + Resource rmContainerResource = rmContainer.getAllocatedResource(); + Resource nmContainerResource = increasedContainerReportedByNM.getResource(); + + + if (Resources.equals(nmContainerResource, rmContainerResource)){ + // NM reported expected container size, tell RMContainer. Which will stop + // container expire monitor + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.NM_DONE_CHANGE_RESOURCE)); + } else if (Resources.fitsIn(getResourceCalculator(), clusterResource, + nmContainerResource, rmContainerResource)) { + // when rmContainerResource >= nmContainerResource, we won't do anything, + // it is possible a container increased is issued by RM, but AM hasn't + // told NM. + } else if (Resources.fitsIn(getResourceCalculator(), clusterResource, + rmContainerResource, nmContainerResource)) { + // When rmContainerResource <= nmContainerResource, it could happen when a + // container decreased by RM before it is increased in NM. + + // Tell NM to decrease the container + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(), + Arrays.asList(rmContainer.getContainer()))); + } else { + // Something wrong happened, kill the container + LOG.warn("Something wrong happened, container size reported by NM" + + " is not expected, ContainerID=" + containerId + + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:" + + nmContainerResource); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + } + } public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { SchedulerApplication<T> app = @@ -511,6 +573,36 @@ public abstract class AbstractYarnScheduler SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); } } + + protected void decreaseContainers( + List<SchedContainerChangeRequest> decreaseRequests, + SchedulerApplicationAttempt attempt) { + for (SchedContainerChangeRequest request : decreaseRequests) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing decrease request:" + request); + } + + boolean hasIncreaseRequest = + attempt.removeIncreaseRequest(request.getNodeId(), + request.getPriority(), request.getContainerId()); + + if (hasIncreaseRequest) { + if (LOG.isDebugEnabled()) { + LOG.debug("While processing decrease request, found a increase request " + + "for the same container " + + request.getContainerId() + + ", removed the increase request"); + } + } + + // handle decrease request + decreaseContainer(request, attempt); + } + } + + protected abstract void decreaseContainer( + SchedContainerChangeRequest decreaseRequest, + SchedulerApplicationAttempt attempt); public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); @@ -734,4 +826,56 @@ public abstract class AbstractYarnScheduler LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = " + maxClusterLevelAppPriority); } + + /** + * Normalize container increase/decrease request, and return + * SchedulerContainerResourceChangeRequest according to given + * ContainerResourceChangeRequest. + * + * <pre> + * - Returns non-null value means validation succeeded + * - Throw exception when any other error happens + * </pre> + */ + private SchedContainerChangeRequest + checkAndNormalizeContainerChangeRequest( + ContainerResourceChangeRequest request, boolean increase) + throws YarnException { + // We have done a check in ApplicationMasterService, but RMContainer status + // / Node resource could change since AMS won't acquire lock of scheduler. + RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request, + increase); + ContainerId containerId = request.getContainerId(); + RMContainer rmContainer = getRMContainer(containerId); + SchedulerNode schedulerNode = + getSchedulerNode(rmContainer.getAllocatedNode()); + + return new SchedContainerChangeRequest(schedulerNode, rmContainer, + request.getCapability()); + } + + protected List<SchedContainerChangeRequest> + checkAndNormalizeContainerChangeRequests( + List<ContainerResourceChangeRequest> changeRequests, + boolean increase) { + if (null == changeRequests || changeRequests.isEmpty()) { + return Collections.EMPTY_LIST; + } + + List<SchedContainerChangeRequest> schedulerChangeRequests = + new ArrayList<SchedContainerChangeRequest>(); + for (ContainerResourceChangeRequest r : changeRequests) { + SchedContainerChangeRequest sr = null; + try { + sr = checkAndNormalizeContainerChangeRequest(r, increase); + } catch (YarnException e) { + LOG.warn("Error happens when checking increase request, Ignoring.." + + " exception=", e); + continue; + } + schedulerChangeRequests.add(sr); + } + + return schedulerChangeRequests; + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 3f2d8af..af6caad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -34,6 +34,9 @@ public class Allocation { final Set<ContainerId> fungibleContainers; final List<ResourceRequest> fungibleResources; final List<NMToken> nmTokens; + final List<Container> increasedContainers; + final List<Container> decreasedContainers; + public Allocation(List<Container> containers, Resource resourceLimit, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, @@ -45,12 +48,22 @@ public class Allocation { public Allocation(List<Container> containers, Resource resourceLimit, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) { + this(containers, resourceLimit,strictContainers, fungibleContainers, + fungibleResources, nmTokens, null, null); + } + + public Allocation(List<Container> containers, Resource resourceLimit, + Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, + List<ResourceRequest> fungibleResources, List<NMToken> nmTokens, + List<Container> increasedContainers, List<Container> decreasedContainer) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; this.fungibleResources = fungibleResources; this.nmTokens = nmTokens; + this.increasedContainers = increasedContainers; + this.decreasedContainers = decreasedContainer; } public List<Container> getContainers() { @@ -76,5 +89,12 @@ public class Allocation { public List<NMToken> getNMTokens() { return nmTokens; } - + + public List<Container> getIncreasedContainers() { + return increasedContainers; + } + + public List<Container> getDecreasedContainers() { + return decreasedContainers; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e318d47..7623da0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -63,8 +67,11 @@ public class AppSchedulingInfo { final Set<Priority> priorities = new TreeSet<Priority>( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); - final Map<Priority, Map<String, ResourceRequest>> requests = - new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>(); + final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap = + new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>(); + final Map<NodeId, Map<Priority, Map<ContainerId, + SchedContainerChangeRequest>>> increaseRequestMap = + new ConcurrentHashMap<>(); private Set<String> userBlacklist = new HashSet<>(); private Set<String> amBlacklist = new HashSet<>(); @@ -114,13 +121,177 @@ public class AppSchedulingInfo { */ private synchronized void clearRequests() { priorities.clear(); - requests.clear(); + resourceRequestMap.clear(); LOG.info("Application " + applicationId + " requests cleared"); } public long getNewContainerId() { return this.containerIdCounter.incrementAndGet(); } + + public boolean hasIncreaseRequest(NodeId nodeId) { + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return false; + } + return requestsOnNode.size() > 0; + } + + public Map<ContainerId, SchedContainerChangeRequest> + getIncreaseRequests(NodeId nodeId, Priority priority) { + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return null; + } + + return requestsOnNode.get(priority); + } + + public synchronized boolean updateIncreaseRequests( + List<SchedContainerChangeRequest> increaseRequests) { + boolean resourceUpdated = false; + + for (SchedContainerChangeRequest r : increaseRequests) { + NodeId nodeId = r.getRMContainer().getAllocatedNode(); + + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + requestsOnNode = new TreeMap<>(); + increaseRequestMap.put(nodeId, requestsOnNode); + } + + SchedContainerChangeRequest prevChangeRequest = + getIncreaseRequest(nodeId, r.getPriority(), r.getContainerId()); + if (null != prevChangeRequest) { + if (Resources.equals(prevChangeRequest.getTargetCapacity(), + r.getTargetCapacity())) { + // New target capacity is as same as what we have, just ignore the new + // one + continue; + } + + // remove the old one + removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(), + prevChangeRequest.getContainerId()); + } + + if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to increase/decrease container, " + + "target capacity = previous capacity = " + prevChangeRequest + + " for container=" + r.getContainerId() + + ". Will ignore this increase request"); + } + continue; + } + + // add the new one + resourceUpdated = true; + insertIncreaseRequest(r); + } + return resourceUpdated; + } + + // insert increase request and add missing hierarchy if missing + private void insertIncreaseRequest(SchedContainerChangeRequest request) { + NodeId nodeId = request.getNodeId(); + Priority priority = request.getPriority(); + ContainerId containerId = request.getContainerId(); + + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + requestsOnNode = + new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>(); + increaseRequestMap.put(nodeId, requestsOnNode); + } + + Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = + requestsOnNode.get(priority); + if (null == requestsOnNodeWithPriority) { + requestsOnNodeWithPriority = + new TreeMap<ContainerId, SchedContainerChangeRequest>(); + requestsOnNode.put(priority, requestsOnNodeWithPriority); + } + + requestsOnNodeWithPriority.put(containerId, request); + + // update resources + String partition = request.getRMContainer().getNodeLabelExpression(); + Resource delta = request.getDeltaCapacity(); + appResourceUsage.incPending(partition, delta); + queue.incPendingResource(partition, delta); + + if (LOG.isDebugEnabled()) { + LOG.debug("Added increase request:" + request.getContainerId() + + " delta=" + request.getDeltaCapacity()); + } + + // update priorities + priorities.add(priority); + } + + public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority, + ContainerId containerId) { + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return false; + } + + Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = + requestsOnNode.get(priority); + if (null == requestsOnNodeWithPriority) { + return false; + } + + SchedContainerChangeRequest request = + requestsOnNodeWithPriority.remove(containerId); + + // remove hierarchies if it becomes empty + if (requestsOnNodeWithPriority.isEmpty()) { + requestsOnNode.remove(priority); + } + if (requestsOnNode.isEmpty()) { + increaseRequestMap.remove(nodeId); + } + + if (request == null) { + return false; + } + + // update queue's pending resource if request exists + String partition = request.getRMContainer().getNodeLabelExpression(); + Resource delta = request.getDeltaCapacity(); + appResourceUsage.decPending(partition, delta); + queue.decPendingResource(partition, delta); + + if (LOG.isDebugEnabled()) { + LOG.debug("remove increase request:" + request); + } + + return true; + } + + public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId, + Priority priority, ContainerId containerId) { + Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode = + increaseRequestMap.get(nodeId); + if (null == requestsOnNode) { + return null; + } + + Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority = + requestsOnNode.get(priority); + if (null == requestsOnNodeWithPriority) { + return null; + } + + return requestsOnNodeWithPriority.get(containerId); + } /** * The ApplicationMaster is updating resource requirements for the @@ -163,11 +334,11 @@ public class AppSchedulingInfo { } } - Map<String, ResourceRequest> asks = this.requests.get(priority); + Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority); if (asks == null) { asks = new ConcurrentHashMap<String, ResourceRequest>(); - this.requests.put(priority, asks); + this.resourceRequestMap.put(priority, asks); this.priorities.add(priority); } lastRequest = asks.get(resourceName); @@ -260,12 +431,12 @@ public class AppSchedulingInfo { synchronized public Map<String, ResourceRequest> getResourceRequests( Priority priority) { - return requests.get(priority); + return resourceRequestMap.get(priority); } public List<ResourceRequest> getAllResourceRequests() { List<ResourceRequest> ret = new ArrayList<ResourceRequest>(); - for (Map<String, ResourceRequest> r : requests.values()) { + for (Map<String, ResourceRequest> r : resourceRequestMap.values()) { ret.addAll(r.values()); } return ret; @@ -273,7 +444,7 @@ public class AppSchedulingInfo { synchronized public ResourceRequest getResourceRequest(Priority priority, String resourceName) { - Map<String, ResourceRequest> nodeRequests = requests.get(priority); + Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority); return (nodeRequests == null) ? null : nodeRequests.get(resourceName); } @@ -301,6 +472,50 @@ public class AppSchedulingInfo { } } + public synchronized void increaseContainer( + SchedContainerChangeRequest increaseRequest) { + NodeId nodeId = increaseRequest.getNodeId(); + Priority priority = increaseRequest.getPriority(); + ContainerId containerId = increaseRequest.getContainerId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocated increase request : applicationId=" + applicationId + + " container=" + containerId + " host=" + + increaseRequest.getNodeId() + " user=" + user + " resource=" + + increaseRequest.getDeltaCapacity()); + } + + // Set queue metrics + queue.getMetrics().allocateResources(user, 0, + increaseRequest.getDeltaCapacity(), true); + + // remove the increase request from pending increase request map + removeIncreaseRequest(nodeId, priority, containerId); + + // update usage + appResourceUsage.incUsed(increaseRequest.getNodePartition(), + increaseRequest.getDeltaCapacity()); + } + + public synchronized void decreaseContainer( + SchedContainerChangeRequest decreaseRequest) { + // Delta is negative when it's a decrease request + Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Decrease container : applicationId=" + applicationId + + " container=" + decreaseRequest.getContainerId() + " host=" + + decreaseRequest.getNodeId() + " user=" + user + " resource=" + + absDelta); + } + + // Set queue metrics + queue.getMetrics().releaseResources(user, 0, absDelta); + + // update usage + appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta); + } + /** * Resources have been allocated to this application by the resource * scheduler. Track them. @@ -359,11 +574,11 @@ public class AppSchedulingInfo { // Update future requirements decResourceRequest(node.getNodeName(), priority, nodeLocalRequest); - ResourceRequest rackLocalRequest = requests.get(priority).get( + ResourceRequest rackLocalRequest = resourceRequestMap.get(priority).get( node.getRackName()); decResourceRequest(node.getRackName(), priority, rackLocalRequest); - ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest offRackRequest = resourceRequestMap.get(priority).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); @@ -377,7 +592,7 @@ public class AppSchedulingInfo { ResourceRequest request) { request.setNumContainers(request.getNumContainers() - 1); if (request.getNumContainers() == 0) { - requests.get(priority).remove(resourceName); + resourceRequestMap.get(priority).remove(resourceName); } } @@ -394,7 +609,7 @@ public class AppSchedulingInfo { // Update future requirements decResourceRequest(node.getRackName(), priority, rackLocalRequest); - ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest offRackRequest = resourceRequestMap.get(priority).get( ResourceRequest.ANY); decrementOutstanding(offRackRequest); @@ -449,6 +664,12 @@ public class AppSchedulingInfo { } } } + + // also we need to check increase request + if (!deactivate) { + deactivate = increaseRequestMap.isEmpty(); + } + if (deactivate) { activeUsersManager.deactivateApplication(user, applicationId); } @@ -457,7 +678,7 @@ public class AppSchedulingInfo { synchronized public void move(Queue newQueue) { QueueMetrics oldMetrics = queue.getMetrics(); QueueMetrics newMetrics = newQueue.getMetrics(); - for (Map<String, ResourceRequest> asks : requests.values()) { + for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { ResourceRequest request = asks.get(ResourceRequest.ANY); if (request != null) { oldMetrics.decrPendingResources(user, request.getNumContainers(), @@ -484,7 +705,7 @@ public class AppSchedulingInfo { synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { // clear pending resources metrics for the application QueueMetrics metrics = queue.getMetrics(); - for (Map<String, ResourceRequest> asks : requests.values()) { + for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) { ResourceRequest request = asks.get(ResourceRequest.ANY); if (request != null) { metrics.decrPendingResources(user, request.getNumContainers(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 09fd73e..d94b621 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -373,17 +373,20 @@ public class QueueMetrics implements MetricsSource { } private void _decrPendingResources(int containers, Resource res) { + // if #container = 0, means change container resource pendingContainers.decr(containers); - pendingMB.decr(res.getMemory() * containers); - pendingVCores.decr(res.getVirtualCores() * containers); + pendingMB.decr(res.getMemory() * Math.max(containers, 1)); + pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); } public void allocateResources(String user, int containers, Resource res, boolean decrPending) { + // if #containers = 0, means change container resource allocatedContainers.incr(containers); aggregateContainersAllocated.incr(containers); - allocatedMB.incr(res.getMemory() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); + + allocatedMB.incr(res.getMemory() * Math.max(containers, 1)); + allocatedVCores.incr(res.getVirtualCores() * Math.max(containers, 1)); if (decrPending) { _decrPendingResources(containers, res); } @@ -397,10 +400,11 @@ public class QueueMetrics implements MetricsSource { } public void releaseResources(String user, int containers, Resource res) { + // if #container = 0, means change container resource. allocatedContainers.decr(containers); aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemory() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); + allocatedMB.decr(res.getMemory() * Math.max(containers, 1)); + allocatedVCores.decr(res.getVirtualCores() * Math.max(containers, 1)); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(user, containers, res); http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java new file mode 100644 index 0000000..ea109fd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * This is ContainerResourceChangeRequest in scheduler side, it contains some + * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will + * be easier for scheduler making decision. + */ +public class SchedContainerChangeRequest implements + Comparable<SchedContainerChangeRequest> { + RMContainer rmContainer; + Resource targetCapacity; + SchedulerNode schedulerNode; + Resource deltaCapacity; + + public SchedContainerChangeRequest(SchedulerNode schedulerNode, + RMContainer rmContainer, Resource targetCapacity) { + this.rmContainer = rmContainer; + this.targetCapacity = targetCapacity; + this.schedulerNode = schedulerNode; + deltaCapacity = Resources.subtract(targetCapacity, + rmContainer.getAllocatedResource()); + } + + public NodeId getNodeId() { + return this.rmContainer.getAllocatedNode(); + } + + public RMContainer getRMContainer() { + return this.rmContainer; + } + + public Resource getTargetCapacity() { + return this.targetCapacity; + } + + /** + * Delta capacity = before - target, so if it is a decrease request, delta + * capacity will be negative + */ + public Resource getDeltaCapacity() { + return deltaCapacity; + } + + public Priority getPriority() { + return rmContainer.getContainer().getPriority(); + } + + public ContainerId getContainerId() { + return rmContainer.getContainerId(); + } + + public String getNodePartition() { + return schedulerNode.getPartition(); + } + + public SchedulerNode getSchedulerNode() { + return schedulerNode; + } + + @Override + public int hashCode() { + return (getContainerId().hashCode() << 16) + targetCapacity.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SchedContainerChangeRequest)) { + return false; + } + return compareTo((SchedContainerChangeRequest)other) == 0; + } + + @Override + public int compareTo(SchedContainerChangeRequest other) { + if (other == null) { + return -1; + } + + int rc = getPriority().compareTo(other.getPriority()); + if (0 != rc) { + return rc; + } + + return getContainerId().compareTo(other.getContainerId()); + } + + @Override + public String toString() { + return "<container=" + getContainerId() + ", targetCapacity=" + + targetCapacity + ", delta=" + deltaCapacity + ", node=" + + getNodeId().toString() + ">"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java index 519de98..96288f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java @@ -28,7 +28,7 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> { private Queue queue; private final String user; - private T currentAttempt; + private volatile T currentAttempt; private volatile Priority priority; public SchedulerApplication(Queue queue, String user) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b361d15..f064e97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -51,16 +53,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -104,8 +109,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); - protected List<RMContainer> newlyAllocatedContainers = - new ArrayList<RMContainer>(); + protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>(); + protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>(); + protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>(); + protected Set<NMToken> updatedNMTokens = new HashSet<>(); // This pendingRelease is used in work-preserving recovery scenario to keep // track of the AM's outstanding release requests. RM on recovery could @@ -219,7 +226,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return appSchedulingInfo.getPriorities(); } - public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) { + public synchronized ResourceRequest getResourceRequest(Priority priority, + String resourceName) { return this.appSchedulingInfo.getResourceRequest(priority, resourceName); } @@ -324,24 +332,28 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return reservedContainers; } - public synchronized RMContainer reserve(SchedulerNode node, Priority priority, - RMContainer rmContainer, Container container) { - // Create RMContainer if necessary - if (rmContainer == null) { - rmContainer = - new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + public synchronized boolean reserveIncreasedContainer(SchedulerNode node, + Priority priority, RMContainer rmContainer, Resource reservedResource) { + if (commonReserve(node, priority, rmContainer, reservedResource)) { attemptResourceUsage.incReserved(node.getPartition(), - container.getResource()); - - // Reset the re-reservation count - resetReReservations(priority); - } else { - // Note down the re-reservation - addReReservation(priority); + reservedResource); + // succeeded + return true; + } + + return false; + } + + private synchronized boolean commonReserve(SchedulerNode node, + Priority priority, RMContainer rmContainer, Resource reservedResource) { + try { + rmContainer.handle(new RMContainerReservedEvent(rmContainer + .getContainerId(), reservedResource, node.getNodeID(), priority)); + } catch (InvalidStateTransitionException e) { + // We reach here could be caused by container already finished, return + // false indicate it fails + return false; } - rmContainer.handle(new RMContainerReservedEvent(container.getId(), - container.getResource(), node.getNodeID(), priority)); Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(priority); @@ -356,8 +368,30 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { + " reserved container " + rmContainer + " on node " + node + ". This attempt currently has " + reservedContainers.size() + " reserved containers at priority " + priority - + "; currentReservation " + container.getResource()); + + "; currentReservation " + reservedResource); } + + return true; + } + + public synchronized RMContainer reserve(SchedulerNode node, + Priority priority, RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + attemptResourceUsage.incReserved(node.getPartition(), + container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + + commonReserve(node, priority, rmContainer, container.getResource()); return rmContainer; } @@ -437,69 +471,100 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { public Resource getCurrentConsumption() { return attemptResourceUsage.getUsed(); } - - public static class ContainersAndNMTokensAllocation { - List<Container> containerList; - List<NMToken> nmTokenList; - - public ContainersAndNMTokensAllocation(List<Container> containerList, - List<NMToken> nmTokenList) { - this.containerList = containerList; - this.nmTokenList = nmTokenList; + + private Container updateContainerAndNMToken(RMContainer rmContainer, + boolean newContainer, boolean increasedContainer) { + Container container = rmContainer.getContainer(); + ContainerType containerType = ContainerType.TASK; + // The working knowledge is that masterContainer for AM is null as it + // itself is the master container. + RMAppAttempt appAttempt = rmContext.getRMApps() + .get(container.getId().getApplicationAttemptId().getApplicationId()) + .getCurrentAppAttempt(); + if (isWaitingForAMContainer(getApplicationId())) { + containerType = ContainerType.APPLICATION_MASTER; } - - public List<Container> getContainerList() { - return containerList; + try { + // create container token and NMToken altogether. + container.setContainerToken(rmContext.getContainerTokenSecretManager() + .createContainerToken(container.getId(), container.getNodeId(), + getUser(), container.getResource(), container.getPriority(), + rmContainer.getCreationTime(), this.logAggregationContext, + rmContainer.getNodeLabelExpression(), containerType)); + NMToken nmToken = + rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), + getApplicationAttemptId(), container); + if (nmToken != null) { + updatedNMTokens.add(nmToken); + } + } catch (IllegalArgumentException e) { + // DNS might be down, skip returning this container. + LOG.error("Error trying to assign container token and NM token to" + + " an updated container " + container.getId(), e); + return null; } - - public List<NMToken> getNMTokenList() { - return nmTokenList; + + if (newContainer) { + rmContainer.handle(new RMContainerEvent( + rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); + } else { + rmContainer.handle(new RMContainerUpdatesAcquiredEvent( + rmContainer.getContainerId(), increasedContainer)); } + return container; } - // Create container token and NMToken altogether, if either of them fails for + // Create container token and update NMToken altogether, if either of them fails for // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. - public synchronized ContainersAndNMTokensAllocation - pullNewlyAllocatedContainersAndNMTokens() { + public synchronized List<Container> pullNewlyAllocatedContainers() { List<Container> returnContainerList = new ArrayList<Container>(newlyAllocatedContainers.size()); - List<NMToken> nmTokens = new ArrayList<NMToken>(); for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i - .hasNext();) { + .hasNext();) { RMContainer rmContainer = i.next(); - Container container = rmContainer.getContainer(); - ContainerType containerType = ContainerType.TASK; - boolean isWaitingForAMContainer = isWaitingForAMContainer( - container.getId().getApplicationAttemptId().getApplicationId()); - if (isWaitingForAMContainer) { - containerType = ContainerType.APPLICATION_MASTER; + Container updatedContainer = + updateContainerAndNMToken(rmContainer, true, false); + // Only add container to return list when it's not null. updatedContainer + // could be null when generate token failed, it can be caused by DNS + // resolving failed. + if (updatedContainer != null) { + returnContainerList.add(updatedContainer); + i.remove(); } - try { - // create container token and NMToken altogether. - container.setContainerToken(rmContext.getContainerTokenSecretManager() - .createContainerToken(container.getId(), container.getNodeId(), - getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime(), this.logAggregationContext, - rmContainer.getNodeLabelExpression(), containerType)); - NMToken nmToken = - rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), - getApplicationAttemptId(), container); - if (nmToken != null) { - nmTokens.add(nmToken); - } - } catch (IllegalArgumentException e) { - // DNS might be down, skip returning this container. - LOG.error("Error trying to assign container token and NM token to" + - " an allocated container " + container.getId(), e); - continue; + } + return returnContainerList; + } + + private synchronized List<Container> pullNewlyUpdatedContainers( + Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) { + List<Container> returnContainerList = + new ArrayList<Container>(updatedContainerMap.size()); + for (Iterator<Entry<ContainerId, RMContainer>> i = + updatedContainerMap.entrySet().iterator(); i.hasNext();) { + RMContainer rmContainer = i.next().getValue(); + Container updatedContainer = + updateContainerAndNMToken(rmContainer, false, increase); + if (updatedContainer != null) { + returnContainerList.add(updatedContainer); + i.remove(); } - returnContainerList.add(container); - i.remove(); - rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), - RMContainerEventType.ACQUIRED)); } - return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); + return returnContainerList; + } + + public synchronized List<Container> pullNewlyIncreasedContainers() { + return pullNewlyUpdatedContainers(newlyIncreasedContainers, true); + } + + public synchronized List<Container> pullNewlyDecreasedContainers() { + return pullNewlyUpdatedContainers(newlyDecreasedContainers, false); + } + + public synchronized List<NMToken> pullUpdatedNMTokens() { + List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens); + updatedNMTokens.clear(); + return returnList; } public boolean isWaitingForAMContainer(ApplicationId applicationId) { @@ -770,4 +835,50 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return attemptResourceUsage; } -} + public synchronized boolean removeIncreaseRequest(NodeId nodeId, + Priority priority, ContainerId containerId) { + return appSchedulingInfo.removeIncreaseRequest(nodeId, priority, + containerId); + } + + public synchronized boolean updateIncreaseRequests( + List<SchedContainerChangeRequest> increaseRequests) { + return appSchedulingInfo.updateIncreaseRequests(increaseRequests); + } + + private synchronized void changeContainerResource( + SchedContainerChangeRequest changeRequest, boolean increase) { + if (increase) { + appSchedulingInfo.increaseContainer(changeRequest); + } else { + appSchedulingInfo.decreaseContainer(changeRequest); + } + + RMContainer changedRMContainer = changeRequest.getRMContainer(); + changedRMContainer.handle( + new RMContainerChangeResourceEvent(changeRequest.getContainerId(), + changeRequest.getTargetCapacity(), increase)); + + // remove pending and not pulled by AM newly-increased/decreased-containers + // and add the new one + if (increase) { + newlyDecreasedContainers.remove(changeRequest.getContainerId()); + newlyIncreasedContainers.put(changeRequest.getContainerId(), + changedRMContainer); + } else { + newlyIncreasedContainers.remove(changeRequest.getContainerId()); + newlyDecreasedContainers.put(changeRequest.getContainerId(), + changedRMContainer); + } + } + + public synchronized void decreaseContainer( + SchedContainerChangeRequest decreaseRequest) { + changeContainerResource(decreaseRequest, false); + } + + public synchronized void increaseContainer( + SchedContainerChangeRequest increaseRequest) { + changeContainerResource(increaseRequest, true); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index f03663a..f3d3906 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -157,6 +157,37 @@ public abstract class SchedulerNode { + getUsedResource() + " used and " + getAvailableResource() + " available after allocation"); } + + private synchronized void changeContainerResource(ContainerId containerId, + Resource deltaResource, boolean increase) { + if (increase) { + deductAvailableResource(deltaResource); + } else { + addAvailableResource(deltaResource); + } + + LOG.info((increase ? "Increased" : "Decreased") + " container " + + containerId + " of capacity " + deltaResource + " on host " + + rmNode.getNodeAddress() + ", which has " + numContainers + + " containers, " + getUsedResource() + " used and " + + getAvailableResource() + " available after allocation"); + } + + /** + * The Scheduler increased container + */ + public synchronized void increaseContainer(ContainerId containerId, + Resource deltaResource) { + changeContainerResource(containerId, deltaResource, true); + } + + /** + * The Scheduler decreased container + */ + public synchronized void decreaseContainer(ContainerId containerId, + Resource deltaResource) { + changeContainerResource(containerId, deltaResource, false); + } /** * Get available resources on the node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 8047d0b..abefee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -361,7 +361,7 @@ public class SchedulerUtils { } public static boolean checkResourceRequestMatchingNodePartition( - ResourceRequest offswitchResourceRequest, String nodePartition, + String requestedPartition, String nodePartition, SchedulingMode schedulingMode) { // We will only look at node label = nodeLabelToLookAt according to // schedulingMode and partition of node. @@ -371,12 +371,11 @@ public class SchedulerUtils { } else { nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; } - - String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression(); - if (null == askedNodePartition) { - askedNodePartition = RMNodeLabelsManager.NO_LABEL; + + if (null == requestedPartition) { + requestedPartition = RMNodeLabelsManager.NO_LABEL; } - return askedNodePartition.equals(nodePartitionToLookAt); + return requestedPartition.equals(nodePartitionToLookAt); } private static boolean hasPendingResourceRequest(ResourceCalculator rc, http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index e3c79f7..9825ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -133,16 +134,17 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> { * @param release * @param blacklistAdditions * @param blacklistRemovals + * @param increaseRequests + * @param decreaseRequests * @return the {@link Allocation} for the application */ @Public @Stable - Allocation - allocate(ApplicationAttemptId appAttemptId, - List<ResourceRequest> ask, - List<ContainerId> release, - List<String> blacklistAdditions, - List<String> blacklistRemovals); + Allocation allocate(ApplicationAttemptId appAttemptId, + List<ResourceRequest> ask, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests); /** * Get node resource usage report. http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 0ae4d1a..9f61b11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -43,10 +43,10 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -76,7 +76,7 @@ public abstract class AbstractCSQueue implements CSQueue { private boolean preemptionDisabled; // Track resource usage-by-label like used-resource/pending-resource, etc. - ResourceUsage queueUsage; + volatile ResourceUsage queueUsage; // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // etc. @@ -340,22 +340,27 @@ public abstract class AbstractCSQueue implements CSQueue { return minimumAllocation; } - synchronized void allocateResource(Resource clusterResource, - Resource resource, String nodePartition) { + synchronized void allocateResource(Resource clusterResource, + Resource resource, String nodePartition, boolean changeContainerResource) { queueUsage.incUsed(nodePartition, resource); - ++numContainers; + if (!changeContainerResource) { + ++numContainers; + } CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, nodePartition); } protected synchronized void releaseResource(Resource clusterResource, - Resource resource, String nodePartition) { + Resource resource, String nodePartition, boolean changeContainerResource) { queueUsage.decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, minimumAllocation, this, labelManager, nodePartition); - --numContainers; + + if (!changeContainerResource) { + --numContainers; + } } @Private @@ -446,8 +451,8 @@ public abstract class AbstractCSQueue implements CSQueue { } synchronized boolean canAssignToThisQueue(Resource clusterResource, - String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, - SchedulingMode schedulingMode) { + String nodePartition, ResourceLimits currentResourceLimits, + Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { // Get current limited resource: // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // queues' max capacity. http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 928437f..68f6f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -41,6 +41,7 @@ public class CSAssignment { private final boolean skipped; private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; + private boolean increaseAllocation; public CSAssignment(Resource resource, NodeType type) { this(resource, type, null, null, false, false); @@ -138,4 +139,12 @@ public class CSAssignment { public AssignmentInformation getAssignmentInformation() { return this.assignmentInformation; } + + public boolean isIncreasedAllocation() { + return increaseAllocation; + } + + public void setIncreasedAllocation(boolean flag) { + increaseAllocation = flag; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 9855dd4..e90deeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -219,6 +220,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { boolean sortQueues); /** + * We have a reserved increased container in the queue, we need to unreserve + * it. Since we just want to cancel the reserved increase request instead of + * stop the container, we shouldn't call completedContainer for such purpose. + */ + public void unreserveIncreasedContainer(Resource clusterResource, + FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer); + + /** * Get the number of applications in the queue. * @return number of applications */ @@ -313,4 +322,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * new resource asked */ public void decPendingResource(String nodeLabel, Resource resourceToDec); + + /** + * Decrease container resource in the queue + */ + public void decreaseContainer(Resource clusterResource, + SchedContainerChangeRequest decreaseRequest, + FiCaSchedulerApp app); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/733b0f68/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ad5c76c..465e233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -98,6 +101,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -840,10 +845,14 @@ public class CapacityScheduler extends } @Override + // Note: when AM asks to decrease container or release container, we will + // acquire scheduler lock @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, - List<ResourceRequest> ask, List<ContainerId> release, - List<String> blacklistAdditions, List<String> blacklistRemovals) { + List<ResourceRequest> ask, List<ContainerId> release, + List<String> blacklistAdditions, List<String> blacklistRemovals, + List<ContainerResourceChangeRequest> increaseRequests, + List<ContainerResourceChangeRequest> decreaseRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { @@ -854,6 +863,14 @@ public class CapacityScheduler extends SchedulerUtils.normalizeRequests( ask, getResourceCalculator(), getClusterResource(), getMinimumResourceCapability(), getMaximumResourceCapability()); + + // Pre-process increase requests + List<SchedContainerChangeRequest> normalizedIncreaseRequests = + checkAndNormalizeContainerChangeRequests(increaseRequests, true); + + // Pre-process decrease requests + List<SchedContainerChangeRequest> normalizedDecreaseRequests = + checkAndNormalizeContainerChangeRequests(decreaseRequests, false); // Release containers releaseContainers(release, application); @@ -870,8 +887,8 @@ public class CapacityScheduler extends return EMPTY_ALLOCATION; } + // Process resource requests if (!ask.isEmpty()) { - if(LOG.isDebugEnabled()) { LOG.debug("allocate: pre-update " + applicationAttemptId + " ask size =" + ask.size()); @@ -888,6 +905,12 @@ public class CapacityScheduler extends application.showRequests(); } } + + // Process increase resource requests + if (application.updateIncreaseRequests(normalizedIncreaseRequests) + && (updateDemandForQueue == null)) { + updateDemandForQueue = (LeafQueue) application.getQueue(); + } if (application.isWaitingForAMContainer(application.getApplicationId())) { // Allocate is for AM and update AM blacklist for this @@ -896,6 +919,9 @@ public class CapacityScheduler extends } else { application.updateBlacklist(blacklistAdditions, blacklistRemovals); } + + // Decrease containers + decreaseContainers(normalizedDecreaseRequests, application); allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); @@ -957,6 +983,13 @@ public class CapacityScheduler extends for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } + + // Processing the newly increased containers + List<Container> newlyIncreasedContainers = + nm.pullNewlyIncreasedContainers(); + for (Container container : newlyIncreasedContainers) { + containerIncreasedOnNode(container.getId(), node, container); + } // Process completed containers int releasedContainers = 0; @@ -1442,6 +1475,50 @@ public class CapacityScheduler extends container.getId(), queue.getQueuePath()); } } + + @Lock(CapacityScheduler.class) + @Override + protected synchronized void decreaseContainer( + SchedContainerChangeRequest decreaseRequest, + SchedulerApplicationAttempt attempt) { + RMContainer rmContainer = decreaseRequest.getRMContainer(); + + // Check container status before doing decrease + if (rmContainer.getState() != RMContainerState.RUNNING) { + LOG.info("Trying to decrease a container not in RUNNING state, container=" + + rmContainer + " state=" + rmContainer.getState().name()); + return; + } + + // Delta capacity of this decrease request is 0, this decrease request may + // just to cancel increase request + if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Decrease target resource equals to existing resource for container:" + + decreaseRequest.getContainerId() + + " ignore this decrease request."); + } + return; + } + + // Save resource before decrease + Resource resourceBeforeDecrease = + Resources.clone(rmContainer.getContainer().getResource()); + + FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; + LeafQueue queue = (LeafQueue) attempt.getQueue(); + queue.decreaseContainer(clusterResource, decreaseRequest, app); + + // Notify RMNode the container will be decreased + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(), + Arrays.asList(rmContainer.getContainer()))); + + LOG.info("Application attempt " + app.getApplicationAttemptId() + + " decreased container:" + decreaseRequest.getContainerId() + " from " + + resourceBeforeDecrease + " to " + + decreaseRequest.getTargetCapacity()); + } @Lock(Lock.NoLock.class) @VisibleForTesting
