http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java new file mode 100644 index 0000000..4e77339 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -0,0 +1,579 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .SchedulerEvent; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.DOT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class TestCapacitySchedulerAutoCreatedQueueBase { + + private static final Log LOG = LogFactory.getLog( + TestCapacitySchedulerAutoCreatedQueueBase.class); + public final int GB = 1024; + public final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + + public static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + public static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + public static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + public static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + public static final String A1 = A + ".a1"; + public static final String A2 = A + ".a2"; + public static final String B1 = B + ".b1"; + public static final String B2 = B + ".b2"; + public static final String B3 = B + ".b3"; + public static final String C1 = C + ".c1"; + public static final String C2 = C + ".c2"; + public static final String C3 = C + ".c3"; + public static final float A_CAPACITY = 20f; + public static final float B_CAPACITY = 40f; + public static final float C_CAPACITY = 20f; + public static final float D_CAPACITY = 20f; + public static final float A1_CAPACITY = 30; + public static final float A2_CAPACITY = 70; + public static final float B1_CAPACITY = 60f; + public static final float B2_CAPACITY = 20f; + public static final float B3_CAPACITY = 20f; + public static final float C1_CAPACITY = 20f; + public static final float C2_CAPACITY = 20f; + + public static final String USER = "user_"; + public static final String USER0 = USER + 0; + public static final String USER1 = USER + 1; + public static final String USER3 = USER + 3; + public static final String USER2 = USER + 2; + public static final String PARENT_QUEUE = "c"; + + public static final Set<String> accessibleNodeLabelsOnC = new HashSet<>(); + + public static final String NODEL_LABEL_GPU = "GPU"; + public static final String NODEL_LABEL_SSD = "SSD"; + + protected MockRM mockRM = null; + protected CapacityScheduler cs; + private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + protected SpyDispatcher dispatcher; + private static EventHandler<Event> rmAppEventEventHandler; + + public static class SpyDispatcher extends AsyncDispatcher { + + public static BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>(); + + public static class SpyRMAppEventHandler implements EventHandler<Event> { + public void handle(Event event) { + eventQueue.add(event); + } + } + + @Override + protected void dispatch(Event event) { + eventQueue.add(event); + } + + @Override + public EventHandler<Event> getEventHandler() { + return rmAppEventEventHandler; + } + + void spyOnNextEvent(Event expectedEvent, long timeout) + throws InterruptedException { + + Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS); + assertEquals(expectedEvent.getType(), event.getType()); + assertEquals(expectedEvent.getClass(), event.getClass()); + } + } + + @Before + public void setUp() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + setupQueueMappings(conf); + + mockRM = new MockRM(conf); + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + + dispatcher = new SpyDispatcher(); + rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); + dispatcher.register(RMAppEventType.class, rmAppEventEventHandler); + cs.updatePlacementRules(); + mockRM.start(); + + cs.start(); + } + + public static CapacitySchedulerConfiguration setupQueueMappings( + CapacitySchedulerConfiguration conf) { + + List<String> queuePlacementRules = new ArrayList<>(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + //set queue mapping + List<UserGroupMappingPlacementRule.QueueMapping> queueMappings = + new ArrayList<>(); + for (int i = 0; i <= 3; i++) { + //Set C as parent queue name for auto queue creation + UserGroupMappingPlacementRule.QueueMapping userQueueMapping = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, + USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + queueMappings.add(userQueueMapping); + } + + conf.setQueueMappings(queueMappings); + //override with queue mappings + conf.setOverrideWithQueueMappings(true); + return conf; + } + + /** + * @param conf, to be modified + * @return, CS configuration which has C as an auto creation enabled parent + * queue + * <p> + * root / \ \ \ a b c d / \ / | \ a1 a2 b1 + * b2 b3 + */ + public static CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c", "d" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + conf.setCapacity(C, C_CAPACITY); + conf.setCapacity(D, D_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] { "b1", "b2", "b3" }); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 50.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); + + conf.setUserLimitFactor(D, 1.0f); + conf.setAutoCreateChildQueueEnabled(D, true); + conf.setUserLimit(D, 100); + conf.setUserLimitFactor(D, 3.0f); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(D, 10.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(D, 3); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(D, 100); + + conf.set(CapacitySchedulerConfiguration.PREFIX + C + DOT + + CapacitySchedulerConfiguration + .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX + + DOT + CapacitySchedulerConfiguration.ORDERING_POLICY, + FAIR_APP_ORDERING_POLICY); + + accessibleNodeLabelsOnC.add(NODEL_LABEL_GPU); + accessibleNodeLabelsOnC.add(NODEL_LABEL_SSD); + accessibleNodeLabelsOnC.add(NO_LABEL); + + conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC); + conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 50); + conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 50); + + LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); + + return conf; + } + + @After + public void tearDown() throws Exception { + if (mockRM != null) { + mockRM.stop(); + } + } + + protected void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue, + float capacity, float absCapacity, float maxCapacity, + float absMaxCapacity) { + assertEquals(capacity, autoCreatedLeafQueue.getCapacity(), EPSILON); + assertEquals(absCapacity, autoCreatedLeafQueue.getAbsoluteCapacity(), + EPSILON); + assertEquals(maxCapacity, autoCreatedLeafQueue.getMaximumCapacity(), + EPSILON); + assertEquals(absMaxCapacity, + autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), EPSILON); + } + + protected void cleanupQueue(String queueName) throws YarnException { + AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName); + if (queue != null) { + setEntitlement(queue, new QueueEntitlement(0.0f, 0.0f)); + ((ManagedParentQueue) queue.getParent()).removeChildQueue( + queue.getQueueName()); + cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); + } + } + + protected ApplicationId submitApp(MockRM rm, CSQueue parentQueue, + String leafQueueName, String user, int expectedNumAppsInParentQueue, + int expectedNumAppsInLeafQueue) throws Exception { + + CapacityScheduler capacityScheduler = + (CapacityScheduler) rm.getResourceScheduler(); + // submit an app + RMApp rmApp = rm.submitApp(GB, "test-auto-queue-activation", user, null, + leafQueueName); + + // check preconditions + List<ApplicationAttemptId> appsInParentQueue = + capacityScheduler.getAppsInQueue(parentQueue.getQueueName()); + assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size()); + + List<ApplicationAttemptId> appsInLeafQueue = + capacityScheduler.getAppsInQueue(leafQueueName); + assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size()); + + return rmApp.getApplicationId(); + } + + protected List<UserGroupMappingPlacementRule.QueueMapping> setupQueueMapping( + CapacityScheduler newCS, String user, String parentQueue, String queue) { + List<UserGroupMappingPlacementRule.QueueMapping> queueMappings = + new ArrayList<>(); + queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user, + getQueueMapping(parentQueue, queue))); + newCS.getConfiguration().setQueueMappings(queueMappings); + return queueMappings; + } + + protected MockRM setupSchedulerInstance() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List<String> queuePlacementRules = new ArrayList<String>(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + setupQueueMappings(conf); + + MockRM newMockRM = new MockRM(conf); + newMockRM.start(); + ((CapacityScheduler) newMockRM.getResourceScheduler()).start(); + return newMockRM; + } + + protected void checkQueueCapacities(CapacityScheduler newCS, float capacityC, + float capacityD) { + CSQueue rootQueue = newCS.getRootQueue(); + CSQueue queueC = tcs.findQueue(rootQueue, C); + CSQueue queueD = tcs.findQueue(rootQueue, D); + CSQueue queueC1 = tcs.findQueue(queueC, C1); + CSQueue queueC2 = tcs.findQueue(queueC, C2); + CSQueue queueC3 = tcs.findQueue(queueC, C3); + + float capC = capacityC / 100.0f; + float capD = capacityD / 100.0f; + + tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f, + (C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f, + (C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); + + if (queueC3 != null) { + ManagedParentQueue parentQueue = (ManagedParentQueue) queueC; + QueueCapacities cap = + parentQueue.getLeafQueueTemplate().getQueueCapacities(); + tcs.checkQueueCapacity(queueC3, cap.getCapacity(), + (cap.getCapacity()) * capC, 1.0f, 1.0f); + } + } + + static String getQueueMapping(String parentQueue, String leafQueue) { + return parentQueue + DOT + leafQueue; + } + + protected ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, + String queue, String parentQueue) { + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user, + new ApplicationPlacementContext(queue, parentQueue)); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent( + appAttemptId, false); + newCS.handle(addAppEvent); + newCS.handle(addAttemptEvent); + return appAttemptId; + } + + protected RMApp submitApp(String user, String queue, String nodeLabel) + throws Exception { + RMApp app = mockRM.submitApp(GB, + "test-auto-queue-creation" + RandomUtils.nextInt(100), user, null, + queue, nodeLabel); + Assert.assertEquals(app.getAmNodeLabelExpression(), nodeLabel); + // check preconditions + List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + assertNotNull(cs.getQueue(queue)); + + return app; + } + + void setEntitlement(AutoCreatedLeafQueue queue, + QueueEntitlement entitlement) { + queue.setCapacity(entitlement.getCapacity()); + queue.setAbsoluteCapacity( + queue.getParent().getAbsoluteCapacity() * entitlement.getCapacity()); + // note: we currently set maxCapacity to capacity + // this might be revised later + queue.setMaxCapacity(entitlement.getMaxCapacity()); + } + + protected void validateUserAndAppLimits( + AutoCreatedLeafQueue autoCreatedLeafQueue, int maxApps, + int maxAppsPerUser) { + assertEquals(maxApps, autoCreatedLeafQueue.getMaxApplications()); + assertEquals(maxAppsPerUser, + autoCreatedLeafQueue.getMaxApplicationsPerUser()); + } + + protected void validateInitialQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity) + throws SchedulerDynamicEditException { + validateInitialQueueEntitlement(cs, parentQueue, leafQueueName, + expectedTotalChildQueueAbsCapacity); + } + + protected void validateInitialQueueEntitlement( + CapacityScheduler capacityScheduler, CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity) + throws SchedulerDynamicEditException { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName); + + for (String label : accessibleNodeLabelsOnC) { + validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label); + } + + assertEquals(true, policy.isActive(leafQueue)); + } + + protected void validateCapacitiesByLabel( + ManagedParentQueue autoCreateEnabledParentQueue, + AutoCreatedLeafQueue leafQueue, String label) { + assertEquals( + autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities() + .getCapacity(), leafQueue.getQueueCapacities().getCapacity(label), + EPSILON); + assertEquals( + autoCreateEnabledParentQueue.getLeafQueueTemplate().getQueueCapacities() + .getMaximumCapacity(), + leafQueue.getQueueCapacities().getMaximumCapacity(label), EPSILON); + } + + protected void validateActivatedQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity, + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException { + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() + .getQueueCapacities(); + QueueEntitlement expectedEntitlement = new QueueEntitlement( + cap.getCapacity(), cap.getMaximumCapacity()); + + //validate capacity + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + + //validate parent queue state + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + leafQueueName); + + //validate leaf queue state + assertEquals(true, policy.isActive(leafQueue)); + } + + protected void validateDeactivatedQueueEntitlement(CSQueue parentQueue, + String leafQueueName, float expectedTotalChildQueueAbsCapacity, + List<QueueManagementChange> queueManagementChanges) + throws SchedulerDynamicEditException { + QueueEntitlement expectedEntitlement = new QueueEntitlement(0.0f, 1.0f); + + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + leafQueueName); + + GuaranteedOrZeroCapacityOverTimePolicy policy = + (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue + .getAutoCreatedQueueManagementPolicy(); + + //validate parent queue state + assertEquals(expectedTotalChildQueueAbsCapacity, + policy.getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + //validate leaf queue state + assertEquals(false, policy.isActive(leafQueue)); + + //validate capacity + validateQueueEntitlements(leafQueueName, expectedEntitlement, + queueManagementChanges); + } + + private void validateQueueEntitlements(String leafQueueName, + QueueEntitlement expectedEntitlement, + List<QueueManagementChange> queueEntitlementChanges) { + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + leafQueueName); + validateQueueEntitlementChangesForLeafQueue(leafQueue, expectedEntitlement, + queueEntitlementChanges); + } + + private void validateQueueEntitlementChangesForLeafQueue(CSQueue leafQueue, + QueueEntitlement expectedQueueEntitlement, + final List<QueueManagementChange> queueEntitlementChanges) { + boolean found = false; + for (QueueManagementChange entitlementChange : queueEntitlementChanges) { + if (leafQueue.getQueueName().equals( + entitlementChange.getQueue().getQueueName())) { + + AutoCreatedLeafQueueConfig updatedQueueTemplate = + entitlementChange.getUpdatedQueueTemplate(); + + for (String label : accessibleNodeLabelsOnC) { + QueueEntitlement newEntitlement = new QueueEntitlement( + updatedQueueTemplate.getQueueCapacities().getCapacity(label), + updatedQueueTemplate.getQueueCapacities() + .getMaximumCapacity(label)); + assertEquals(expectedQueueEntitlement, newEntitlement); + } + found = true; + break; + } + } + if (!found) { + fail("Could not find the specified leaf queue in entitlement changes : " + + leafQueue.getQueueName()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 7090bc9..049a932 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -26,51 +26,54 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt + .RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common + .QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event + .SchedulerEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy + .FairOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.security + .ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security + .NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security + .RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; +import static org.apache.hadoop.yarn.server.resourcemanager.placement + .UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -79,198 +82,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * Tests for creation and reinitilization of auto created leaf queues + * Tests for creation and reinitialization of auto created leaf queues * under a ManagedParentQueue. */ -public class TestCapacitySchedulerAutoQueueCreation { - - private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); - private final int GB = 1024; - private final static ContainerUpdates NULL_UPDATE_REQUESTS = - new ContainerUpdates(); - - private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - private static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; - private static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; - private static final String A1 = A + ".a1"; - private static final String A2 = A + ".a2"; - private static final String B1 = B + ".b1"; - private static final String B2 = B + ".b2"; - private static final String B3 = B + ".b3"; - private static final String C1 = C + ".c1"; - private static final String C2 = C + ".c2"; - private static final String C3 = C + ".c3"; - private static float A_CAPACITY = 20f; - private static float B_CAPACITY = 40f; - private static float C_CAPACITY = 20f; - private static float D_CAPACITY = 20f; - private static float A1_CAPACITY = 30; - private static float A2_CAPACITY = 70; - private static float B1_CAPACITY = 60f; - private static float B2_CAPACITY = 20f; - private static float B3_CAPACITY = 20f; - private static float C1_CAPACITY = 20f; - private static float C2_CAPACITY = 20f; - - private static String USER = "user_"; - private static String USER0 = USER + 0; - private static String USER2 = USER + 2; - private static String PARENT_QUEUE = "c"; - - private MockRM mockRM = null; - - private CapacityScheduler cs; - - private final TestCapacityScheduler tcs = new TestCapacityScheduler(); - - private static SpyDispatcher dispatcher; - - private static EventHandler<Event> rmAppEventEventHandler; - - private static class SpyDispatcher extends AsyncDispatcher { - - private static BlockingQueue<Event> eventQueue = - new LinkedBlockingQueue<>(); - - private static class SpyRMAppEventHandler implements EventHandler<Event> { - public void handle(Event event) { - eventQueue.add(event); - } - } - - @Override - protected void dispatch(Event event) { - eventQueue.add(event); - } - - @Override - public EventHandler<Event> getEventHandler() { - return rmAppEventEventHandler; - } - - void spyOnNextEvent(Event expectedEvent, long timeout) - throws InterruptedException { - - Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS); - assertEquals(expectedEvent.getType(), event.getType()); - assertEquals(expectedEvent.getClass(), event.getClass()); - } - } - - @Before - public void setUp() throws Exception { - CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - setupQueueConfiguration(conf); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - - List<String> queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); - conf.setQueuePlacementRules(queuePlacementRules); - - setupQueueMappings(conf); - - mockRM = new MockRM(conf); - cs = (CapacityScheduler) mockRM.getResourceScheduler(); - - dispatcher = new SpyDispatcher(); - rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); - dispatcher.register(RMAppEventType.class, rmAppEventEventHandler); - cs.updatePlacementRules(); - mockRM.start(); - - cs.start(); - } - - private CapacitySchedulerConfiguration setupQueueMappings( - CapacitySchedulerConfiguration conf) { - - //set queue mapping - List<UserGroupMappingPlacementRule.QueueMapping> queueMappings = - new ArrayList<>(); - for (int i = 0; i <= 3; i++) { - //Set C as parent queue name for auto queue creation - UserGroupMappingPlacementRule.QueueMapping userQueueMapping = - new UserGroupMappingPlacementRule.QueueMapping( - UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, - USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); - queueMappings.add(userQueueMapping); - } +public class TestCapacitySchedulerAutoQueueCreation + extends TestCapacitySchedulerAutoCreatedQueueBase { - conf.setQueueMappings(queueMappings); - //override with queue mappings - conf.setOverrideWithQueueMappings(true); - return conf; - } - - /** - * @param conf, to be modified - * @return, CS configuration which has C - * as an auto creation enabled parent queue - * <p> - * root - * / \ \ \ - * a b c d - * / \ / | \ - * a1 a2 b1 b2 b3 - */ - private CapacitySchedulerConfiguration setupQueueConfiguration( - CapacitySchedulerConfiguration conf) { - - //setup new queues with one of them auto enabled - // Define top-level queues - // Set childQueue for root - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[] { "a", "b", "c", "d" }); - - conf.setCapacity(A, A_CAPACITY); - conf.setCapacity(B, B_CAPACITY); - conf.setCapacity(C, C_CAPACITY); - conf.setCapacity(D, D_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] { "a1", "a2" }); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - conf.setQueues(B, new String[] { "b1", "b2", "b3" }); - conf.setCapacity(B1, B1_CAPACITY); - conf.setUserLimitFactor(B1, 100.0f); - conf.setCapacity(B2, B2_CAPACITY); - conf.setUserLimitFactor(B2, 100.0f); - conf.setCapacity(B3, B3_CAPACITY); - conf.setUserLimitFactor(B3, 100.0f); - - conf.setUserLimitFactor(C, 1.0f); - conf.setAutoCreateChildQueueEnabled(C, true); - - //Setup leaf queue template configs - conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); - - LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); - - conf.setUserLimitFactor(D, 1.0f); - conf.setAutoCreateChildQueueEnabled(D, true); - - //Setup leaf queue template configs - conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f); - - LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); - - return conf; - } - - @After - public void tearDown() throws Exception { - if (mockRM != null) { - mockRM.stop(); - } - } + private static final Log LOG = LogFactory.getLog( + TestCapacitySchedulerAutoQueueCreation.class); @Test(timeout = 10000) public void testAutoCreateLeafQueueCreation() throws Exception { @@ -289,7 +108,11 @@ public class TestCapacitySchedulerAutoQueueCreation { ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + validateInitialQueueEntitlement(parentQueue, USER0, 0.1f); + validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000); + + assertTrue(autoCreatedLeafQueue + .getOrderingPolicy() instanceof FairOrderingPolicy); } finally { cleanupQueue(USER0); } @@ -297,7 +120,6 @@ public class TestCapacitySchedulerAutoQueueCreation { @Test public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { - try { String host = "127.0.0.1"; RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, @@ -306,20 +128,28 @@ public class TestCapacitySchedulerAutoQueueCreation { // submit an app - RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, + RMApp app1 = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, null, USER0); + + RMApp app2 = mockRM.submitApp(GB, "test-auto-queue-creation-2", USER1, + null, USER1); // check preconditions List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE); - assertEquals(1, appsInC.size()); + assertEquals(2, appsInC.size()); assertNotNull(cs.getQueue(USER0)); + assertNotNull(cs.getQueue(USER1)); - AutoCreatedLeafQueue autoCreatedLeafQueue = - (AutoCreatedLeafQueue) cs.getQueue(USER0); + AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) cs.getQueue( + USER0); + AutoCreatedLeafQueue user1Queue = (AutoCreatedLeafQueue) cs.getQueue( + USER0); ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( PARENT_QUEUE); - assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); - validateCapacities(autoCreatedLeafQueue); + assertEquals(parentQueue, user0Queue.getParent()); + assertEquals(parentQueue, user1Queue.getParent()); + validateInitialQueueEntitlement(parentQueue, USER0, 0.2f); + validateInitialQueueEntitlement(parentQueue, USER1, 0.2f); ApplicationAttemptId appAttemptId = appsInC.get(0); @@ -337,7 +167,7 @@ public class TestCapacitySchedulerAutoQueueCreation { CapacityScheduler.schedule(cs); //change state to draining - autoCreatedLeafQueue.stopQueue(); + user0Queue.stopQueue(); cs.killAllAppsInQueue(USER0); @@ -346,80 +176,24 @@ public class TestCapacitySchedulerAutoQueueCreation { mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED); //change state to stopped - autoCreatedLeafQueue.stopQueue(); + user0Queue.stopQueue(); assertEquals(QueueState.STOPPED, - autoCreatedLeafQueue.getQueueInfo().getQueueState()); + user0Queue.getQueueInfo().getQueueState()); cs.reinitialize(cs.getConf(), mockRM.getRMContext()); - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( - USER0); - validateCapacities(leafQueue); - - } finally { - cleanupQueue(USER0); - } - } - - @Test - public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception { - - MockRM newMockRM = setupSchedulerInstance(); - try { - CapacityScheduler newCS = - (CapacityScheduler) newMockRM.getResourceScheduler(); - CapacitySchedulerConfiguration conf = newCS.getConfiguration(); - - // Test add one auto created queue dynamically and manually modify - // capacity - ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c"); - AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1", - parentQueue); - newCS.addQueue(c1); - c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f)); - - // Test add another auto created queue and use setEntitlement to modify - // capacity - AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2", - (ManagedParentQueue) newCS.getQueue("c")); - newCS.addQueue(c2); - newCS.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f)); - - // Verify all allocations match - checkQueueCapacities(newCS, C_CAPACITY, D_CAPACITY); - - // Reinitialize and verify all dynamic queued survived - - conf.setCapacity(A, 20f); - conf.setCapacity(B, 20f); - conf.setCapacity(C, 40f); - conf.setCapacity(D, 20f); - newCS.reinitialize(conf, newMockRM.getRMContext()); - - checkQueueCapacities(newCS, 40f, 20f); - - //chnage parent template configs and reinitialize - conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f); - conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); - newCS.reinitialize(conf, newMockRM.getRMContext()); + AutoCreatedLeafQueue user0QueueReinited = + (AutoCreatedLeafQueue) cs.getQueue(USER0); - ManagedParentQueue c = (ManagedParentQueue) newCS.getQueue("c"); - AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", c); - newCS.addQueue(c3); + validateCapacities(user0QueueReinited, 0.0f, 0.0f, 1.0f, 1.0f); - AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate - leafQueueTemplate = parentQueue.getLeafQueueTemplate(); - QueueCapacities cap = leafQueueTemplate.getQueueCapacities(); - c3.setEntitlement( - new QueueEntitlement(cap.getCapacity(), cap.getMaximumCapacity())); - newCS.reinitialize(conf, newMockRM.getRMContext()); + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + USER1); + validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(), + 0.1f); - checkQueueCapacities(newCS, 40f, 20f); } finally { - if (newMockRM != null) { - ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); - newMockRM.stop(); - } + cleanupQueue(USER0); } } @@ -460,7 +234,7 @@ public class TestCapacitySchedulerAutoQueueCreation { CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(newConf); - newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10); + newConf.setAutoCreatedLeafQueueConfigCapacity(A1, A1_CAPACITY / 10); newConf.setAutoCreateChildQueueEnabled(A1, true); newCS.setConf(new YarnConfiguration()); @@ -490,7 +264,7 @@ public class TestCapacitySchedulerAutoQueueCreation { CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(newConf); - newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10); + newConf.setAutoCreatedLeafQueueConfigCapacity(A, A_CAPACITY / 10); newConf.setAutoCreateChildQueueEnabled(A, true); newCS.setConf(new YarnConfiguration()); @@ -531,39 +305,6 @@ public class TestCapacitySchedulerAutoQueueCreation { assertEquals(RMAppState.FAILED, app.getState()); } - private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { - assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); - assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, - EPSILON); - int maxAppsForAutoCreatedQueues = (int) ( - CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS - * autoCreatedLeafQueue.getParent().getAbsoluteCapacity()); - assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), - maxAppsForAutoCreatedQueues); - assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), - (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration() - .getUserLimitFactor( - autoCreatedLeafQueue.getParent().getQueuePath())))); - } - - private void cleanupQueue(String queueName) throws YarnException { - AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName); - if (queue != null) { - queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f)); - ((ManagedParentQueue) queue.getParent()).removeChildQueue( - queue.getQueueName()); - cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); - } else{ - throw new YarnException("Queue does not exist " + queueName); - } - } - - String getQueueMapping(String parentQueue, String leafQueue) { - return parentQueue + DOT + leafQueue; - } - @Test(timeout = 10000) public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping() throws Exception { @@ -586,8 +327,7 @@ public class TestCapacitySchedulerAutoQueueCreation { //expected exception assertTrue(e.getMessage().contains( "invalid parent queue which does not have auto creation of leaf " - + "queues enabled [" - + "a" + "]")); + + "queues enabled [" + "a" + "]")); } //"a" is not auto create enabled and app_user does not exist as a leaf @@ -650,9 +390,6 @@ public class TestCapacitySchedulerAutoQueueCreation { (CapacityScheduler) newMockRM.getResourceScheduler(); try { - newMockRM.start(); - newCS.start(); - submitApp(newCS, USER0, USER0, PARENT_QUEUE); assertNotNull(newCS.getQueue(USER0)); @@ -700,12 +437,16 @@ public class TestCapacitySchedulerAutoQueueCreation { AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1", parentQueue); newCS.addQueue(c1); - c1.setEntitlement(new QueueEntitlement(0.5f, 1f)); + c1.setCapacity(0.5f); + c1.setAbsoluteCapacity(c1.getParent().getAbsoluteCapacity() * 1f); + c1.setMaxCapacity(1f); + + setEntitlement(c1, new QueueEntitlement(0.5f, 1f)); AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2", parentQueue); newCS.addQueue(c2); - c2.setEntitlement(new QueueEntitlement(0.5f, 1f)); + setEntitlement(c2, new QueueEntitlement(0.5f, 1f)); try { AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", @@ -723,72 +464,160 @@ public class TestCapacitySchedulerAutoQueueCreation { } } - private List<UserGroupMappingPlacementRule.QueueMapping> setupQueueMapping( - CapacityScheduler newCS, String user, String parentQueue, String queue) { - List<UserGroupMappingPlacementRule.QueueMapping> queueMappings = - new ArrayList<>(); - queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping( - UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user, - getQueueMapping(parentQueue, queue))); - newCS.getConfiguration().setQueueMappings(queueMappings); - return queueMappings; - } + @Test + public void testAutoCreatedQueueActivationDeactivation() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + submitApp(mockRM, parentQueue, USER1, USER1, 1, 1); + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, + 1); + validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + + //submit another app3 as USER1 + submitApp(mockRM, parentQueue, USER1, USER1, 3, 2); + + //validate total activated abs capacity remains the same + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) + parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + + //submit user_3 app. This cant be scheduled since there is no capacity + submitApp(mockRM, parentQueue, USER3, USER3, 4, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, + 1.0f, 1.0f); - private MockRM setupSchedulerInstance() { - CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - setupQueueConfiguration(conf); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); + assertEquals(autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); - List<String> queuePlacementRules = new ArrayList<String>(); - queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); - conf.setQueuePlacementRules(queuePlacementRules); + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); - setupQueueMappings(conf); + //Verify if USER_2 can be deactivated since it has no pending appsA + List<QueueManagementChange> queueManagementChanges = + autoCreatedQueueManagementPolicy.computeQueueManagementChanges(); - MockRM newMockRM = new MockRM(conf); - return newMockRM; + ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue; + managedParentQueue.validateAndApplyQueueManagementChanges( + queueManagementChanges); + + validateDeactivatedQueueEntitlement(parentQueue, USER2, 0.2f, + queueManagementChanges); + + //USER_3 should now get activated + validateActivatedQueueEntitlement(parentQueue, USER3, 0.2f, + queueManagementChanges); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } } - void checkQueueCapacities(CapacityScheduler newCS, float capacityC, - float capacityD) { - CSQueue rootQueue = newCS.getRootQueue(); - CSQueue queueC = tcs.findQueue(rootQueue, C); - CSQueue queueD = tcs.findQueue(rootQueue, D); - CSQueue queueC1 = tcs.findQueue(queueC, C1); - CSQueue queueC2 = tcs.findQueue(queueC, C2); - CSQueue queueC3 = tcs.findQueue(queueC, C3); - - float capC = capacityC / 100.0f; - float capD = capacityD / 100.0f; - - tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f, - (C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f, - (C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); - - if (queueC3 != null) { - ManagedParentQueue parentQueue = (ManagedParentQueue) queueC; - QueueCapacities cap = - parentQueue.getLeafQueueTemplate().getQueueCapacities(); - tcs.checkQueueCapacity(queueC3, cap.getCapacity(), - (cap.getCapacity()) * capC, 1.0f, 1.0f); + @Test + public void testAutoCreatedQueueInheritsNodeLabels() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + submitApp(USER1, USER1, NODEL_LABEL_GPU); + //submit app1 as USER1 + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + } finally { + cleanupQueue(USER1); } } - ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, - String queue, String parentQueue) { - ApplicationId appId = BuilderUtils.newApplicationId(1, 1); - SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user, - new ApplicationPlacementContext(queue, parentQueue)); - ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( - appId, 1); - SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent( - appAttemptId, false); - newCS.handle(addAppEvent); - newCS.handle(addAttemptEvent); - return appAttemptId; + @Test + public void testReinitializeQueuesWithAutoCreatedLeafQueues() + throws Exception { + + MockRM newMockRM = setupSchedulerInstance(); + try { + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + CapacitySchedulerConfiguration conf = newCS.getConfiguration(); + + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + newCS.handle(new NodeAddedSchedulerEvent(node)); + + CSQueue parentQueue = newCS.getQueue(PARENT_QUEUE); + + //submit app1 as USER1 + submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1); + validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, + 2, 1); + validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f); + + //update parent queue capacity + conf.setCapacity(C, 30f); + conf.setCapacity(D, 10f); + conf.setMaximumCapacity(C, 50f); + + newCS.reinitialize(conf, newMockRM.getRMContext()); + + // validate that leaf queues abs capacity is now changed + AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) newCS.getQueue( + USER1); + validateCapacities(user0Queue, 0.5f, 0.15f, 1.0f, 0.5f); + validateUserAndAppLimits(user0Queue, 1500, 1500); + + //update leaf queue template capacities + conf.setAutoCreatedLeafQueueConfigCapacity(C, 30f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 40f); + + newCS.reinitialize(conf, newMockRM.getRMContext()); + validateCapacities(user0Queue, 0.3f, 0.09f, 0.4f, 0.2f); + validateUserAndAppLimits(user0Queue, 900, 900); + + //submit app1 as USER3 + submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1); + validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f); + AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue( + USER1); + validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f); + validateUserAndAppLimits(user3Queue, 900, 900); + + //submit app1 as USER1 - is already activated. there should be no diff + // in capacities + submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2); + validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f); + validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f); + validateUserAndAppLimits(user3Queue, 900, 900); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java index 9425d5e..9aba30c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -77,21 +77,21 @@ public class TestCapacitySchedulerDynamicBehavior { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); //set default queue capacity to zero - ((AutoCreatedLeafQueue) cs + ((ReservationQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); // Test add one reservation dynamically and manually modify capacity - AutoCreatedLeafQueue a1 = - new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); // Test add another reservation queue and use setEntitlement to modify // capacity - AutoCreatedLeafQueue a2 = - new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); @@ -113,8 +113,8 @@ public class TestCapacitySchedulerDynamicBehavior { try { // Test invalid addition (adding non-zero size queue) - AutoCreatedLeafQueue a1 = - new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); cs.addQueue(a1); fail(); @@ -123,11 +123,11 @@ public class TestCapacitySchedulerDynamicBehavior { } // Test add one reservation dynamically and manually modify capacity - AutoCreatedLeafQueue a1 = - new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); //set default queue capacity to zero - ((AutoCreatedLeafQueue) cs + ((ReservationQueue) cs .getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX)) .setEntitlement( new QueueEntitlement(0f, 1f)); @@ -135,8 +135,8 @@ public class TestCapacitySchedulerDynamicBehavior { // Test add another reservation queue and use setEntitlement to modify // capacity - AutoCreatedLeafQueue a2 = - new AutoCreatedLeafQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); + ReservationQueue a2 = + new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a")); cs.addQueue(a2); @@ -162,8 +162,8 @@ public class TestCapacitySchedulerDynamicBehavior { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); // Test add one reservation dynamically and manually modify capacity - AutoCreatedLeafQueue a1 = - new AutoCreatedLeafQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); + ReservationQueue a1 = + new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a")); cs.addQueue(a1); a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f)); @@ -230,8 +230,8 @@ public class TestCapacitySchedulerDynamicBehavior { // create the default reservation queue String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX; - AutoCreatedLeafQueue defQ = - new AutoCreatedLeafQueue(scheduler, defQName, + ReservationQueue defQ = + new ReservationQueue(scheduler, defQName, (PlanQueue) scheduler.getQueue("a")); scheduler.addQueue(defQ); defQ.setEntitlement(new QueueEntitlement(1f, 1f)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java new file mode 100644 index 0000000..c71d2bf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestGuaranteedOrZeroCapacityOverTimePolicy.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestGuaranteedOrZeroCapacityOverTimePolicy { + + @Test + public void testGetMaxLeavesToBeActivated() + throws SchedulerDynamicEditException { + GuaranteedOrZeroCapacityOverTimePolicy policy = + new GuaranteedOrZeroCapacityOverTimePolicy(); + + assertEquals(1, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 1)); + assertEquals(5, policy.getMaxLeavesToBeActivated(0.17f, 0.03f, 7)); + assertEquals(0, policy.getMaxLeavesToBeActivated(0, 0.03f, 10)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 1426e88..c45bdb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager + .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.DOT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CapacitySchedulerConfiguration.ROOT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -44,6 +52,7 @@ import java.util.concurrent.CyclicBarrier; import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -189,7 +198,7 @@ public class TestLeafQueue { root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, + ROOT, queues, queues, TestUtils.spyHook); root.updateClusterResource(Resources.createResource(100 * 16 * GB, 100 * 32), @@ -222,12 +231,12 @@ public class TestLeafQueue { final String newRoot) { // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot}); - conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); - conf.setAcl(CapacitySchedulerConfiguration.ROOT, + conf.setQueues(ROOT, new String[] {newRoot}); + conf.setMaximumCapacity(ROOT, 100); + conf.setAcl(ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); - final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; + final String Q_newRoot = ROOT + "." + newRoot; conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); @@ -410,7 +419,7 @@ public class TestLeafQueue { CapacitySchedulerConfiguration testConf = new CapacitySchedulerConfiguration(); - String tproot = CapacitySchedulerConfiguration.ROOT + "." + + String tproot = ROOT + "." + "testPolicyRoot" + System.currentTimeMillis(); OrderingPolicy<FiCaSchedulerApp> comPol = @@ -485,7 +494,7 @@ public class TestLeafQueue { CapacitySchedulerConfiguration testConf = new CapacitySchedulerConfiguration(); - String tproot = CapacitySchedulerConfiguration.ROOT + "." + + String tproot = ROOT + "." + "testPolicyRoot" + System.currentTimeMillis(); OrderingPolicy<FiCaSchedulerApp> schedOrder = @@ -722,12 +731,12 @@ public class TestLeafQueue { Priority priority = TestUtils.createMockPriority(1); app0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true, - priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + priority, recordFactory, NO_LABEL))); assign = b.assignContainers(clusterResource, node0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); app0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true, - priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + priority, recordFactory, NO_LABEL))); assign = b.assignContainers(clusterResource, node0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertTrue("Still within limits, should assign", @@ -796,11 +805,11 @@ public class TestLeafQueue { Priority priority = TestUtils.createMockPriority(1); app0.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true, - priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + priority, recordFactory, NO_LABEL))); app2.updateResourceRequests(Collections.singletonList(TestUtils .createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true, - priority, recordFactory, RMNodeLabelsManager.NO_LABEL))); + priority, recordFactory, NO_LABEL))); /** * Start testing... @@ -2277,7 +2286,7 @@ public class TestLeafQueue { CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1); Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>(); CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, - csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, + csConf, null, ROOT, newQueues, queues, TestUtils.spyHook); root.reinitialize(newRoot, cs.getClusterResource()); @@ -2712,9 +2721,12 @@ public class TestLeafQueue { CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2); Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>(); - CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, - csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, - TestUtils.spyHook); + CSQueue newRoot = + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + ROOT, + newQueues, queues, + TestUtils.spyHook); + queues = newQueues; root.reinitialize(newRoot, csContext.getClusterResource()); // after reinitialization @@ -2738,7 +2750,7 @@ public class TestLeafQueue { Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>(); CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, + ROOT, newQueues, queues, TestUtils.spyHook); root.reinitialize(newRoot, cs.getClusterResource()); @@ -3621,7 +3633,7 @@ public class TestLeafQueue { assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, e.getTotalPendingResourcesConsideringUserLimit(clusterResource, - RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + NO_LABEL, false).getMemorySize()); // Assign 2nd container of 1GB applyCSAssignment(clusterResource, @@ -3635,7 +3647,7 @@ public class TestLeafQueue { assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // Can't allocate 3rd container due to user-limit. Headroom still 0. applyCSAssignment(clusterResource, @@ -3645,7 +3657,7 @@ public class TestLeafQueue { assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB). // Pending for both app_0 and app_1 are still 3GB, so user-limit-factor @@ -3653,7 +3665,7 @@ public class TestLeafQueue { // getTotalPendingResourcesConsideringUserLimit() e.setUserLimitFactor(10.0f); assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); applyCSAssignment(clusterResource, e.assignContainers(clusterResource, node_0, @@ -3663,7 +3675,7 @@ public class TestLeafQueue { assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // Get the last 2 containers for app_1, no more pending requests. applyCSAssignment(clusterResource, @@ -3677,7 +3689,7 @@ public class TestLeafQueue { assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // Release each container from app_0 for (RMContainer rmContainer : app_0.getLiveContainers()) { @@ -3788,7 +3800,7 @@ public class TestLeafQueue { // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0, // queue 'e' should be able to consume 1GB per user. assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // None of the apps have assigned resources // user_0's apps: assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize()); @@ -3805,7 +3817,7 @@ public class TestLeafQueue { // The first container was assigned to user_0's app_0. Queues total headroom // has 1GB left for user_1. assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -3823,7 +3835,7 @@ public class TestLeafQueue { // this container went to user_0's app_1. so, headroom for queue 'e'e is // still 1GB for user_1 assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -3839,7 +3851,7 @@ public class TestLeafQueue { // Container was allocated to user_1's app_2 since user_1, Now, no headroom // is left. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -3855,7 +3867,7 @@ public class TestLeafQueue { // Allocated to user_1's app_2 since scheduler allocates 1 container // above user resource limit. Available headroom still 0. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: long app_0_consumption = app_0.getCurrentConsumption().getMemorySize(); assertEquals(1*GB, app_0_consumption); @@ -3875,7 +3887,7 @@ public class TestLeafQueue { // Cannot allocate 5th container because both users are above their allowed // user resource limit. Values should be the same as previously. assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemorySize()); assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemorySize()); @@ -3894,7 +3906,7 @@ public class TestLeafQueue { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps); // Next container goes to user_0's app_1, since it still wanted 1GB. assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); // user_0's apps: assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); @@ -3909,7 +3921,7 @@ public class TestLeafQueue { // Last container goes to user_1's app_3, since it still wanted 1GB. // user_0's apps: assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( - clusterResource, RMNodeLabelsManager.NO_LABEL, false).getMemorySize()); + clusterResource, NO_LABEL, false).getMemorySize()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize()); // user_1's apps: @@ -4027,6 +4039,59 @@ public class TestLeafQueue { app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f); } + @Test + public void testSetupQueueConfigsWithSpecifiedConfiguration() + throws IOException { + + try { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + new Configuration(false), false); + + final String leafQueueName = + "testSetupQueueConfigsWithSpecifiedConfiguration"; + + assertEquals(0, conf.size()); + conf.setNodeLocalityDelay(60); + conf.setCapacity(ROOT + DOT + leafQueueName, 10); + conf.setMaximumCapacity(ROOT + DOT + leafQueueName, 100); + conf.setUserLimitFactor(ROOT + DOT +leafQueueName, 0.1f); + + csConf.setNodeLocalityDelay(30); + csConf.setGlobalMaximumApplicationsPerQueue(20); + + LeafQueue leafQueue = new LeafQueue(csContext, conf, + leafQueueName, cs.getRootQueue(), + null); + + assertEquals(30, leafQueue.getNodeLocalityDelay()); + assertEquals(20, leafQueue.getMaxApplications()); + assertEquals(2, leafQueue.getMaxApplicationsPerUser()); + + //check queue configs + conf.setMaximumAMResourcePercentPerPartition(leafQueue.getQueueName(), + NO_LABEL, 10); + conf.setMaximumCapacity(leafQueue.getQueueName(), 10); + + assertEquals(0.1, leafQueue.getMaxAMResourcePerQueuePercent(), + EPSILON); + assertEquals(1, leafQueue.getMaximumCapacity(), + EPSILON); + assertEquals(0.1, leafQueue.getCapacity(), + EPSILON); + assertEquals(0.1, leafQueue.getAbsoluteCapacity(), + EPSILON); + assertEquals(1.0, leafQueue.getAbsoluteMaximumCapacity(), + EPSILON); + + } finally { + //revert config changes + csConf.setNodeLocalityDelay( + CapacitySchedulerConfiguration.DEFAULT_NODE_LOCALITY_DELAY); + csConf.setGlobalMaximumApplicationsPerQueue( + (int) CapacitySchedulerConfiguration.UNDEFINED); + } + } + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); ApplicationAttemptId attId = http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java new file mode 100644 index 0000000..4dc56fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueManagementDynamicEditPolicy.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy; +import org.junit.Before; +import org.junit.Test; + + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.CSQueueUtils.EPSILON; +import static org.junit.Assert.assertEquals; + +public class TestQueueManagementDynamicEditPolicy extends + TestCapacitySchedulerAutoCreatedQueueBase { + private QueueManagementDynamicEditPolicy policy = new + QueueManagementDynamicEditPolicy(); + + @Before + public void setUp() throws Exception { + super.setUp(); + policy.init(cs.getConfiguration(), cs.getRMContext(), cs); + } + + @Test + public void testEditSchedule() throws Exception { + + try { + policy.editSchedule(); + assertEquals(2, policy.getManagedParentQueues().size()); + + CSQueue parentQueue = cs.getQueue(PARENT_QUEUE); + + GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy = + (GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) + parentQueue) + .getAutoCreatedQueueManagementPolicy(); + assertEquals(0f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + //submit app1 as USER1 + ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1, + 1); + validateInitialQueueEntitlement(parentQueue, USER1, 0.1f); + + //submit another app2 as USER2 + ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2, + 1); + validateInitialQueueEntitlement(parentQueue, USER2, 0.2f); + + //validate total activated abs capacity + assertEquals(0.2f, autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), EPSILON); + + //submit user_3 app. This cant be scheduled since there is no capacity + submitApp(mockRM, parentQueue, USER3, USER3, 3, 1); + final CSQueue user3LeafQueue = cs.getQueue(USER3); + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f, + 1.0f, 1.0f); + + assertEquals(autoCreatedQueueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity(), 0.2f, EPSILON); + + //deactivate USER2 queue + cs.killAllAppsInQueue(USER2); + mockRM.waitForState(user2AppId, RMAppState.KILLED); + + //deactivate USER1 queue + cs.killAllAppsInQueue(USER1); + mockRM.waitForState(user1AppId, RMAppState.KILLED); + + policy.editSchedule(); + + waitForPolicyState(0.1f, autoCreatedQueueManagementPolicy, 1000); + + validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.5f, 0.1f, + 1.0f, 1.0f); + + validateCapacitiesByLabel((ManagedParentQueue) parentQueue, (AutoCreatedLeafQueue) user3LeafQueue, + NODEL_LABEL_GPU); + + } finally { + cleanupQueue(USER1); + cleanupQueue(USER2); + cleanupQueue(USER3); + } + } + + private void waitForPolicyState(float expectedVal, + GuaranteedOrZeroCapacityOverTimePolicy queueManagementPolicy, int + timesec) throws + InterruptedException { + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timesec * 1000) { + if (Float.compare(expectedVal, queueManagementPolicy + .getAbsoluteActivatedChildQueueCapacity()) != 0) { + Thread.sleep(100); + } else { + break; + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
