http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java deleted file mode 100644 index 19148d7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; - -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentSkipListMap; - -public class PreemptableQueue { - // Partition -> killable resources and containers - private Map<String, Resource> totalKillableResources = new HashMap<>(); - private Map<String, Map<ContainerId, RMContainer>> killableContainers = - new HashMap<>(); - private PreemptableQueue parent; - - public PreemptableQueue(PreemptableQueue parent) { - this.parent = parent; - } - - public PreemptableQueue(Map<String, Resource> totalKillableResources, - Map<String, Map<ContainerId, RMContainer>> killableContainers) { - this.totalKillableResources = totalKillableResources; - this.killableContainers = killableContainers; - } - - void addKillableContainer(KillableContainer container) { - String partition = container.getNodePartition(); - if (!totalKillableResources.containsKey(partition)) { - totalKillableResources.put(partition, Resources.createResource(0)); - killableContainers.put(partition, - new ConcurrentSkipListMap<ContainerId, RMContainer>()); - } - - RMContainer c = container.getRMContainer(); - Resources.addTo(totalKillableResources.get(partition), - c.getAllocatedResource()); - killableContainers.get(partition).put(c.getContainerId(), c); - - if (null != parent) { - parent.addKillableContainer(container); - } - } - - void removeKillableContainer(KillableContainer container) { - String partition = container.getNodePartition(); - Map<ContainerId, RMContainer> partitionKillableContainers = - killableContainers.get(partition); - if (partitionKillableContainers != null) { - RMContainer rmContainer = partitionKillableContainers.remove( - container.getRMContainer().getContainerId()); - if (null != rmContainer) { - Resources.subtractFrom(totalKillableResources.get(partition), - rmContainer.getAllocatedResource()); - } - } - - if (null != parent) { - parent.removeKillableContainer(container); - } - } - - public Resource getKillableResource(String partition) { - Resource res = totalKillableResources.get(partition); - return res == null ? Resources.none() : res; - } - - @SuppressWarnings("unchecked") - public Map<ContainerId, RMContainer> getKillableContainers(String partition) { - Map<ContainerId, RMContainer> map = killableContainers.get(partition); - return map == null ? Collections.EMPTY_MAP : map; - } - - public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() { - return killableContainers; - } - - Map<String, Resource> getTotalKillableResources() { - return totalKillableResources; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java deleted file mode 100644 index a9f02a5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class PreemptionManager { - private ReentrantReadWriteLock.ReadLock readLock; - private ReentrantReadWriteLock.WriteLock writeLock; - private Map<String, PreemptableQueue> entities = new HashMap<>(); - - public PreemptionManager() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - } - - public void refreshQueues(CSQueue parent, CSQueue current) { - try { - writeLock.lock(); - PreemptableQueue parentEntity = null; - if (parent != null) { - parentEntity = entities.get(parent.getQueueName()); - } - - if (!entities.containsKey(current.getQueueName())) { - entities.put(current.getQueueName(), - new PreemptableQueue(parentEntity)); - } - - if (current.getChildQueues() != null) { - for (CSQueue child : current.getChildQueues()) { - refreshQueues(current, child); - } - } - } - finally { - writeLock.unlock(); - } - } - - public void addKillableContainer(KillableContainer container) { - try { - writeLock.lock(); - PreemptableQueue entity = entities.get(container.getLeafQueueName()); - if (null != entity) { - entity.addKillableContainer(container); - } - } - finally { - writeLock.unlock(); - } - } - - public void removeKillableContainer(KillableContainer container) { - try { - writeLock.lock(); - PreemptableQueue entity = entities.get(container.getLeafQueueName()); - if (null != entity) { - entity.removeKillableContainer(container); - } - } - finally { - writeLock.unlock(); - } - } - - public void moveKillableContainer(KillableContainer oldContainer, - KillableContainer newContainer) { - // TODO, will be called when partition of the node changed OR - // container moved to different queue - } - - public void updateKillableContainerResource(KillableContainer container, - Resource oldResource, Resource newResource) { - // TODO, will be called when container's resource changed - } - - @VisibleForTesting - public Map<ContainerId, RMContainer> getKillableContainersMap( - String queueName, String partition) { - try { - readLock.lock(); - PreemptableQueue entity = entities.get(queueName); - if (entity != null) { - Map<ContainerId, RMContainer> containers = - entity.getKillableContainers().get(partition); - if (containers != null) { - return containers; - } - } - return Collections.emptyMap(); - } - finally { - readLock.unlock(); - } - } - - public Iterator<RMContainer> getKillableContainers(String queueName, - String partition) { - return getKillableContainersMap(queueName, partition).values().iterator(); - } - - public Resource getKillableResource(String queueName, String partition) { - try { - readLock.lock(); - PreemptableQueue entity = entities.get(queueName); - if (entity != null) { - Resource res = entity.getTotalKillableResources().get(partition); - if (res == null || res.equals(Resources.none())) { - return Resources.none(); - } - return Resources.clone(res); - } - return Resources.none(); - } - finally { - readLock.unlock(); - } - } - - public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() { - try { - readLock.lock(); - Map<String, PreemptableQueue> map = new HashMap<>(); - for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) { - String key = entry.getKey(); - PreemptableQueue entity = entry.getValue(); - map.put(key, new PreemptableQueue( - new HashMap<>(entity.getTotalKillableResources()), - new HashMap<>(entity.getKillableContainers()))); - } - return map; - } finally { - readLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index aad3bc7..5158255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -120,9 +120,9 @@ public class AssignmentInformation { } private ContainerId getFirstContainerIdFromOperation(Operation op) { - if (null != operationDetails.get(op)) { + if (null != operationDetails.get(Operation.ALLOCATION)) { List<AssignmentDetails> assignDetails = - operationDetails.get(op); + operationDetails.get(Operation.ALLOCATION); if (!assignDetails.isEmpty()) { return assignDetails.get(0).containerId; } @@ -131,7 +131,7 @@ public class AssignmentInformation { } public ContainerId getFirstAllocatedOrReservedContainerId() { - ContainerId containerId; + ContainerId containerId = null; containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); if (null != containerId) { return containerId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f474aad..4d563cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -95,7 +94,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { * to hold the message if its app doesn't not get container from a node */ private String appSkipNodeDiagnostics; - private CapacitySchedulerContext capacitySchedulerContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -140,30 +138,28 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } containerAllocator = new ContainerAllocator(this, rc, rmContext); - - if (scheduler instanceof CapacityScheduler) { - capacitySchedulerContext = (CapacitySchedulerContext) scheduler; - } } - public synchronized boolean containerCompleted(RMContainer rmContainer, + synchronized public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { - ContainerId containerId = rmContainer.getContainerId(); // Remove from the list of containers - if (null == liveContainers.remove(containerId)) { + if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } - + // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + // Inform the container rmContainer.handle( new RMContainerFinishedEvent(containerId, containerStatus, event)); - containersToPreempt.remove(containerId); + containersToPreempt.remove(rmContainer.getContainerId()); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, "SchedulerApp", @@ -180,7 +176,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return true; } - public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, + synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, Container container) { @@ -204,9 +200,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); - - ContainerId containerId = container.getId(); - liveContainers.put(containerId, rmContainer); + liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( @@ -219,17 +213,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Inform the container rmContainer.handle( - new RMContainerEvent(containerId, RMContainerEventType.START)); + new RMContainerEvent(container.getId(), RMContainerEventType.START)); if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationAttemptId=" - + containerId.getApplicationAttemptId() - + " container=" + containerId + " host=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + container.getNodeId().getHost() + " type=" + type); } RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), containerId); + getApplicationId(), container.getId()); return rmContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 1d0e78a..fe6db47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,29 +18,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; + +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.util.resource.Resources; - -import java.util.HashMap; -import java.util.Map; -import java.util.Set; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); - private Map<ContainerId, RMContainer> killableContainers = new HashMap<>(); - private Resource totalKillableResources = Resource.newInstance(0, 0); public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, Set<String> nodeLabels) { @@ -99,6 +92,7 @@ public class FiCaSchedulerNode extends SchedulerNode { @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { + // adding NP checks as this can now be called for preemption if (getReservedContainer() != null && getReservedContainer().getContainer() != null @@ -121,55 +115,4 @@ public class FiCaSchedulerNode extends SchedulerNode { } setReservedContainer(null); } - - // According to decisions from preemption policy, mark the container to killable - public synchronized void markContainerToKillable(ContainerId containerId) { - RMContainer c = launchedContainers.get(containerId); - if (c != null && !killableContainers.containsKey(containerId)) { - killableContainers.put(containerId, c); - Resources.addTo(totalKillableResources, c.getAllocatedResource()); - } - } - - // According to decisions from preemption policy, mark the container to - // non-killable - public synchronized void markContainerToNonKillable(ContainerId containerId) { - RMContainer c = launchedContainers.get(containerId); - if (c != null && killableContainers.containsKey(containerId)) { - killableContainers.remove(containerId); - Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); - } - } - - @Override - protected synchronized void updateResource( - Container container) { - super.updateResource(container); - if (killableContainers.containsKey(container.getId())) { - Resources.subtractFrom(totalKillableResources, container.getResource()); - killableContainers.remove(container.getId()); - } - } - - @Override - protected synchronized void changeContainerResource(ContainerId containerId, - Resource deltaResource, boolean increase) { - super.changeContainerResource(containerId, deltaResource, increase); - - if (killableContainers.containsKey(containerId)) { - if (increase) { - Resources.addTo(totalKillableResources, deltaResource); - } else { - Resources.subtractFrom(totalKillableResources, deltaResource); - } - } - } - - public synchronized Resource getTotalKillableResources() { - return totalKillableResources; - } - - public synchronized Map<ContainerId, RMContainer> getKillableContainers() { - return killableContainers; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 35b7c14..9cf09e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,15 +38,10 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - /* Source: SchedulingEditPolicy */ + // Source: SchedulingEditPolicy KILL_RESERVED_CONTAINER, - - // Mark a container for preemption - MARK_CONTAINER_FOR_PREEMPTION, - - // Mark a for-preemption container killable - MARK_CONTAINER_FOR_KILLABLE, - - // Cancel a killable container - MARK_CONTAINER_FOR_NONKILLABLE + MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption + // in the near future + KILL_PREEMPTED_CONTAINER // Kill a container previously marked for + // preemption } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index c944752..d9306dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -59,7 +59,7 @@ public class TestRMDispatcher { rmDispatcher.getEventHandler().handle(event1); ContainerPreemptEvent event2 = new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE); + SchedulerEventType.KILL_PREEMPTED_CONTAINER); rmDispatcher.getEventHandler().handle(event2); ContainerPreemptEvent event3 = new ContainerPreemptEvent(appAttemptId, container, @@ -70,7 +70,7 @@ public class TestRMDispatcher { verify(sched, times(3)).handle(any(SchedulerEvent.class)); verify(sched).killReservedContainer(container); verify(sched).markContainerForPreemption(appAttemptId, container); - verify(sched).markContainerForKillable(container); + verify(sched).killPreemptedContainer(container); } catch (InterruptedException e) { Assert.fail(); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 3057615..028afb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2352,7 +2352,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() .get(app0.getApplicationId()).getCurrentAppAttempt(); // kill app0-attempt - cs.markContainerForKillable(schedulerAppAttempt.getRMContainer( + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( app0.getCurrentAppAttempt().getMasterContainer().getId())); am0.waitForState(RMAppAttemptState.FAILED); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 16f3f60..5035afe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -565,7 +566,7 @@ public class TestAMRestart { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -581,7 +582,7 @@ public class TestAMRestart { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -676,7 +677,7 @@ public class TestAMRestart { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index e9129de..13f267d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -168,7 +167,6 @@ public class TestProportionalCapacityPreemptionPolicy { when(mCS.getConfiguration()).thenReturn(schedConf); rmContext = mock(RMContext.class); when(mCS.getRMContext()).thenReturn(rmContext); - when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager()); when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); Dispatcher disp = mock(Dispatcher.class); @@ -291,7 +289,7 @@ public class TestProportionalCapacityPreemptionPolicy { List<ContainerPreemptEvent> events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType()); + assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 21ea495..512f37c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -124,7 +123,6 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { mClock = mock(Clock.class); cs = mock(CapacityScheduler.class); when(cs.getResourceCalculator()).thenReturn(rc); - when(cs.getPreemptionManager()).thenReturn(new PreemptionManager()); nlm = mock(RMNodeLabelsManager.class); mDisp = mock(EventHandler.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 171196f..0b32676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -265,7 +264,6 @@ public class TestApplicationLimits { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); // Say cluster has 100 nodes of 16G each Resource clusterResource = http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index d8161f8..1569a12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -512,7 +512,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -542,7 +542,7 @@ public class TestApplicationPriority { if (++counter > 1) { break; } - cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 16ba607..b6c005b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1188,7 +1188,7 @@ public class TestCapacityScheduler { // kill the 3 containers for (Container c : allocatedContainers) { - cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1197,7 +1197,7 @@ public class TestCapacityScheduler { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1220,7 +1220,7 @@ public class TestCapacityScheduler { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1269,7 +1269,7 @@ public class TestCapacityScheduler { } // Call killContainer to preempt the container - cs.markContainerForKillable(rmContainer); + cs.killPreemptedContainer(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java deleted file mode 100644 index bea7797..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java +++ /dev/null @@ -1,677 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; - -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.MockAM; -import org.apache.hadoop.yarn.server.resourcemanager.MockNM; -import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestCapacitySchedulerPreemption { - private static final Log LOG = LogFactory.getLog( - TestCapacitySchedulerPreemption.class); - - private final int GB = 1024; - - private Configuration conf; - - RMNodeLabelsManager mgr; - - Clock clock; - - @Before - public void setUp() throws Exception { - conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); - conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); - conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); - - // Set preemption related configurations - conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL, - 0); - conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, - true); - conf.setFloat( - ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f); - conf.setFloat( - ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f); - mgr = new NullRMNodeLabelsManager(); - mgr.init(this.conf); - clock = mock(Clock.class); - when(clock.getTime()).thenReturn(0L); - } - - private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { - RMActiveServices activeServices = rm.getRMActiveService(); - SchedulingMonitor mon = null; - for (Service service : activeServices.getServices()) { - if (service instanceof SchedulingMonitor) { - mon = (SchedulingMonitor) service; - break; - } - } - - if (mon != null) { - return mon.getSchedulingEditPolicy(); - } - return null; - } - - @Test (timeout = 60000) - public void testSimplePreemption() throws Exception { - /** - * Test case: Submit two application (app1/app2) to different queues, queue - * structure: - * - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * - * 1) Two nodes in the cluster, each of them has 4G. - * - * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no - * more resource available. - * - * 3) app2 submit to queue-c, ask for one 1G container (for AM) - * - * Now the cluster is fulfilled. - * - * 4) app2 asks for another 1G container, system will preempt one container - * from app1, and app2 will receive the preempted container - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getUnallocatedResource().getMemory()); - - // AM asks for a 1 * GB container - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(1, killableContainers.size()); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 6 containers, and app2 has 2 containers - Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersNodeLocalityDelay() - throws Exception { - /** - * Test case: same as testSimplePreemption steps 1-3. - * - * Step 4: app2 asks for 1G container with locality specified, so it needs - * to wait for missed-opportunity before get scheduled. - * Check if system waits missed-opportunity before finish killable container - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getUnallocatedResource().getMemory()); - - // AM asks for a 1 * GB container with unknown host and unknown rack - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1), ResourceRequest - .newInstance(Priority.newInstance(1), "unknownhost", - Resources.createResource(1 * GB), 1), ResourceRequest - .newInstance(Priority.newInstance(1), "/default-rack", - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (no container preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - // Do allocation again, one container will be preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - // App1 has 6 containers, and app2 has 2 containers (new container allocated) - Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersHardNodeLocality() - throws Exception { - /** - * Test case: same as testSimplePreemption steps 1-3. - * - * Step 4: app2 asks for 1G container with hard locality specified, and - * asked host is not existed - * Confirm system doesn't preempt any container. - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getUnallocatedResource().getMemory()); - - // AM asks for a 1 * GB container for h3 with hard locality, - // h3 doesn't exist in the cluster - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1, true), ResourceRequest - .newInstance(Priority.newInstance(1), "h3", - Resources.createResource(1 * GB), 1, false), ResourceRequest - .newInstance(Priority.newInstance(1), "/default-rack", - Resources.createResource(1 * GB), 1, false)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() - .getApplicationAttemptId(), am1.getApplicationAttemptId()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (no container preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - // Do allocation again, nothing will be preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - // App1 has 7 containers, and app2 has 1 containers (no container allocated) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - @Test (timeout = 60000) - public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() - throws Exception { - /** - * Test case: - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * Submit applications to two queues, one uses more than the other, so - * preemption will happen. - * - * Check: - * 1) Killable containers resources will be excluded from PCPP (no duplicated - * container added to killable list) - * 2) When more resources need to be preempted, new containers will be selected - * and killable containers will be considered - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 6 times for node1 - for (int i = 0; i < 6; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); - - // NM1 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>()); - - // Get edit policy and do one update - ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if one container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); - - // Check killable containers and to-be-preempted containers in edit policy - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Run edit schedule again, confirm status doesn't changed - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Save current to kill containers - Set<ContainerId> previousKillableContainers = new HashSet<>( - pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL) - .keySet()); - - // Update request resource of c from 1 to 2, so we need to preempt - // one more container - am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>()); - - // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map - // and 1 container in killable map - editPolicy.editSchedule(); - Assert.assertEquals(1, editPolicy.getToPreemptContainers().size()); - - // Call editPolicy.editSchedule() once more, we should have 2 containers killable map - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - - // Check if previous killable containers included by new killable containers - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - Assert.assertTrue( - Sets.difference(previousKillableContainers, killableContainers.keySet()) - .isEmpty()); - } - - @Test (timeout = 60000) - public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() - throws Exception { - /** - * Test case: - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * Submit applications to two queues, one uses more than the other, so - * preemption will happen. - * - * Check: - * 1) Containers will be marked to killable - * 2) Cancel resource request - * 3) Killable containers will be cancelled from policy and scheduler - */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 6 times for node1 - for (int i = 0; i < 6; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); - - // NM1 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>()); - - // Get edit policy and do one update - ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if 3 container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - PreemptionManager pm = cs.getPreemptionManager(); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3); - - // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2) - am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>()); - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - - // Call editSchedule once more to make sure still nothing happens - editPolicy.editSchedule(); - Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); - } - - @Test (timeout = 60000) - public void testPreemptionConsidersUserLimit() - throws Exception { - /** - * Test case: Submit two application (app1/app2) to different queues, queue - * structure: - * - * <pre> - * Root - * / | \ - * a b c - * 10 20 70 - * </pre> - * - * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c - * - * 1) Two nodes in the cluster, each of them has 4G. - * - * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no - * more resource available. - * - * 3) app2 submit to queue-c, ask for one 1G container (for AM) - * - * Now the cluster is fulfilled. - * - * 4) app2 asks for another 1G container, system will preempt one container - * from app1, and app2 will receive the preempted container - */ - CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); - csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f); - MockRM rm1 = new MockRM(csConf); - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); - MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); - - // Do allocation 3 times for node1/node2 - for (int i = 0; i < 3; i++) { - cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - } - - // App1 should have 7 containers now, and no available resource for cluster - FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( - am1.getApplicationAttemptId()); - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - - // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // NM1/NM2 has available resource = 0G - Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) - .getUnallocatedResource().getMemory()); - Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) - .getUnallocatedResource().getMemory()); - - // AM asks for a 1 * GB container - am2.allocate(Arrays.asList(ResourceRequest - .newInstance(Priority.newInstance(1), ResourceRequest.ANY, - Resources.createResource(1 * GB), 1)), null); - - // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); - - // Call edit schedule twice, and check if no container from app1 marked - // to be "killable" - editPolicy.editSchedule(); - editPolicy.editSchedule(); - - // No preemption happens - PreemptionManager pm = cs.getPreemptionManager(); - Map<ContainerId, RMContainer> killableContainers = - waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0); - Assert.assertEquals(0, killableContainers.size()); - - // Call CS.handle once to see if container preempted - cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - - FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( - am2.getApplicationAttemptId()); - - // App1 has 7 containers, and app2 has 1 containers (nothing preempted) - Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); - Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); - - rm1.close(); - } - - private Map<ContainerId, RMContainer> waitKillableContainersSize( - PreemptionManager pm, String queueName, String partition, - int expectedSize) throws InterruptedException { - Map<ContainerId, RMContainer> killableContainers = - pm.getKillableContainersMap(queueName, partition); - - int wait = 0; - // Wait for at most 5 sec (it should be super fast actually) - while (expectedSize != killableContainers.size() && wait < 500) { - killableContainers = pm.getKillableContainersMap(queueName, partition); - Thread.sleep(10); - wait++; - } - - Assert.assertEquals(expectedSize, killableContainers.size()); - return killableContainers; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 1612201..5169337 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -100,7 +99,6 @@ public class TestChildQueueOrder { when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); } private FiCaSchedulerApp getMockApplication(int appId, String user) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 87a3d51..69b0813 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -151,7 +150,6 @@ public class TestLeafQueue { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -3094,7 +3092,6 @@ public class TestLeafQueue { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(2 * GB, 2)); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); return csContext; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 1ee201d..bbf6e43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1677,100 +1676,4 @@ public class TestNodeLabelContainerAllocation { checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), cs.getApplicationAttempt(am1.getApplicationAttemptId())); } - - @Test - public void testParentQueueMaxCapsAreRespected() throws Exception { - /* - * Queue tree: - * Root - * / \ - * A B - * / \ - * A1 A2 - * - * A has 50% capacity and 50% max capacity (of label=x) - * A1/A2 has 50% capacity and 100% max capacity (of label=x) - * Cluster has one node (label=x) with resource = 24G. - * So we can at most use 12G resources under queueA. - */ - CapacitySchedulerConfiguration csConf = - new CapacitySchedulerConfiguration(this.conf); - - // Define top-level queues - csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", - "b"}); - csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); - - final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - csConf.setCapacity(A, 10); - csConf.setAccessibleNodeLabels(A, toSet("x")); - csConf.setCapacityByLabel(A, "x", 50); - csConf.setMaximumCapacityByLabel(A, "x", 50); - - final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - csConf.setCapacity(B, 90); - csConf.setAccessibleNodeLabels(B, toSet("x")); - csConf.setCapacityByLabel(B, "x", 50); - csConf.setMaximumCapacityByLabel(B, "x", 50); - - // Define 2nd-level queues - csConf.setQueues(A, new String[] { "a1", - "a2"}); - - final String A1 = A + ".a1"; - csConf.setCapacity(A1, 50); - csConf.setAccessibleNodeLabels(A1, toSet("x")); - csConf.setCapacityByLabel(A1, "x", 50); - csConf.setMaximumCapacityByLabel(A1, "x", 100); - csConf.setUserLimitFactor(A1, 100.0f); - - final String A2 = A + ".a2"; - csConf.setCapacity(A2, 50); - csConf.setAccessibleNodeLabels(A2, toSet("x")); - csConf.setCapacityByLabel(A2, "x", 50); - csConf.setMaximumCapacityByLabel(A2, "x", 100); - csConf.setUserLimitFactor(A2, 100.0f); - - // set node -> label - mgr.addToCluserNodeLabels(ImmutableSet.of( - NodeLabel.newInstance("x", false))); - mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); - - // inject node label manager - MockRM rm = new MockRM(csConf) { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - - rm.getRMContext().setNodeLabelManager(mgr); - rm.start(); - - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - - MockNM nm1 = - new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService()); - nm1.registerNode(); - - // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB - RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x"); - doNMHeartbeat(rm, nm1.getNodeId(), 10); - checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), - cs.getApplicationAttempt(am1.getApplicationAttemptId())); - - // Try to launch app2 in a2, asked 2GB, should success - RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); - - // am2 asks more resources, cannot success because current used = 9G (app1) - // + 2G (app2) = 11G, and queue's max capacity = 12G - am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x"); - - doNMHeartbeat(rm, nm1.getNodeId(), 10); - checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), - cs.getApplicationAttempt(am2.getApplicationAttemptId())); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 23dc860..f73baa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -93,7 +92,6 @@ public class TestParentQueue { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa7a4352/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 56facee..2ef5e39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -127,7 +126,6 @@ public class TestReservations { when(csContext.getNonPartitionedQueueComparator()).thenReturn( CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); - when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( conf);
