Repository: hadoop Updated Branches: refs/heads/branch-2 56b82de6e -> 4ee55d032
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee55d03/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 6f759ce..5bdcc08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,18 +18,31 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -37,6 +50,11 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -84,7 +102,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { conf = null; } - private void startResourceManager(float utilizationThreshold) { + private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) { conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, utilizationThreshold); resourceManager = new MockRM(conf); @@ -98,6 +116,51 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.updateInterval = 60 * 1000; } + // YARN-4648: The starting code for ResourceManager mock is originated from + // TestFairScheduler. It should be keep as it was to guarantee no changing + // behaviour of ResourceManager preemption. + private void startResourceManagerWithRealFairScheduler() { + scheduler = new FairScheduler(); + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, + 1024); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + conf.setFloat( + FairSchedulerConfiguration + .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE, + TEST_RESERVATION_THRESHOLD); + + resourceManager = new MockRM(conf); + + // TODO: This test should really be using MockRM. For now starting stuff + // that is needed at a bare minimum. + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + + // to initialize the master key + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); + } + + private void stopResourceManager() { + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); + } + private void registerNodeAndSubmitApp( int memory, int vcores, int appContainers, int appMemory) { RMNode node1 = MockNodes.newNodeInfo( @@ -143,7 +206,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { out.println("</allocations>"); out.close(); - startResourceManager(0f); + startResourceManagerWithStubbedFairScheduler(0f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024); @@ -159,7 +222,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { resourceManager.stop(); - startResourceManager(0.8f); + startResourceManagerWithStubbedFairScheduler(0.8f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); @@ -175,7 +238,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { resourceManager.stop(); - startResourceManager(0.7f); + startResourceManagerWithStubbedFairScheduler(0.7f); // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); @@ -189,4 +252,1226 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { assertEquals("preemptResources() should have been called", 1024, ((StubbedFairScheduler) scheduler).lastPreemptMemory); } + + @Test (timeout = 5000) + /** + * Make sure containers are chosen to be preempted in the correct order. + */ + public void testChoiceOfPreemptedContainers() throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"default\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + // Queue A and B each request two applications + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2); + + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3); + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4); + + scheduler.update(); + + scheduler.getQueueManager().getLeafQueue("queueA", true) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.getQueueManager().getLeafQueue("queueB", true) + .setPolicy(SchedulingPolicy.parse("fair")); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + for (int i = 0; i < 4; i++) { + scheduler.handle(nodeUpdate1); + scheduler.handle(nodeUpdate2); + } + + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + + // Now new requests arrive from queueC and default + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + scheduler.update(); + + // We should be able to claw back one container from queueA and queueB each. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the app. + // For queueA (fifo), app2 is selected. + // For queueB (fair), app4 is selected. + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App4 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + + // Pretend 15 seconds have passed + clock.tickSec(15); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(Resources.createResource(2 * 1024)); + + // At this point the containers should have been killed (since we are not simulating AM) + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Inside each app, containers are sorted according to their priorities. + // Containers with priority 4 are preempted for app2 and app4. + Set<RMContainer> set = new HashSet<RMContainer>(); + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + for (RMContainer container : + scheduler.getSchedulerApp(app4).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + assertTrue("Containers with priority=4 in app2 and app4 should be " + + "preempted.", set.isEmpty()); + + // Trigger a kill by insisting we want containers back + scheduler.preemptResources(Resources.createResource(2 * 1024)); + + // Pretend 15 seconds have passed + clock.tickSec(15); + + // We should be able to claw back another container from A and B each. + // For queueA (fifo), continue preempting from app2. + // For queueB (fair), even app4 has a lowest priority container with p=4, it + // still preempts from app3 as app3 is most over fair share. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + + // Now A and B are below fair share, so preemption shouldn't do anything + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertTrue("App1 should have no container to be preempted", + scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); + assertTrue("App2 should have no container to be preempted", + scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); + assertTrue("App3 should have no container to be preempted", + scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); + assertTrue("App4 should have no container to be preempted", + scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); + stopResourceManager(); + } + + @Test + public void testPreemptionIsNotDelayedToNextRound() throws Exception { + startResourceManagerWithRealFairScheduler(); + + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>8</weight>"); + out.println("<queue name=\"queueA1\" />"); + out.println("<queue name=\"queueA2\" />"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>2</weight>"); + out.println("</queue>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node of 8G + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA1", "user1", 7, 1); + // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); + ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB", + "user2", 1, 1); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 8; i++) { + scheduler.handle(nodeUpdate1); + } + + // verify if the apps got the containers they requested + assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA2", "user3", 7, 1); + scheduler.update(); + + // Let 11 sec pass + clock.tickSec(11); + + scheduler.update(); + Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); + assertEquals(3277, toPreempt.getMemory()); + + // verify if the 3 containers required by queueA2 are preempted in the same + // round + scheduler.preemptResources(toPreempt); + assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() + .size()); + stopResourceManager(); + } + + @Test (timeout = 5000) + /** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecision() throws Exception { + startResourceManagerWithRealFairScheduler(); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("<maxResources>0mb,0vcores</maxResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + } + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + + scheduler.update(); + + FSLeafQueue schedC = + scheduler.getQueueManager().getLeafQueue("queueC", true); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); + // After minSharePreemptionTime has passed, they should want to preempt min + // share. + clock.tickSec(6); + assertEquals( + 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + + // After fairSharePreemptionTime has passed, they should want to preempt + // fair share. + scheduler.update(); + clock.tickSec(6); + assertEquals( + 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + stopResourceManager(); + } + + @Test +/** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithDRF() throws Exception { + startResourceManagerWithRealFairScheduler(); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("<maxResources>0mb,0vcores</maxResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,1vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,2vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,3vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,2vcores</minResources>"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + } + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3); + + scheduler.update(); + + FSLeafQueue schedC = + scheduler.getQueueManager().getLeafQueue("queueC", true); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); + + // Test : + // 1) whether componentWise min works as expected. + // 2) DRF calculator is used + + // After minSharePreemptionTime has passed, they should want to preempt min + // share. + clock.tickSec(6); + Resource res = scheduler.resourceDeficit(schedC, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 3 + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resourceDeficit(schedD, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 6, but min share = 2 + assertEquals(2, res.getVirtualCores()); + + // After fairSharePreemptionTime has passed, they should want to preempt + // fair share. + scheduler.update(); + clock.tickSec(6); + res = scheduler.resourceDeficit(schedC, clock.getTime()); + assertEquals(1536, res.getMemory()); + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resourceDeficit(schedD, clock.getTime()); + assertEquals(1536, res.getMemory()); + // Demand = 6, but fair share = 3 + assertEquals(3, res.getVirtualCores()); + stopResourceManager(); + } + + @Test + /** + * Tests the various timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithVariousTimeout() throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("<maxResources>0mb,0vcores</maxResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>1</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>2</weight>"); + out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>"); + out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>"); + out.println("<queue name=\"queueB1\">"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("<queue name=\"queueB2\">"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>1</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>"); + out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(25000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(25000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(20000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(10000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(10000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // Create one big node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A takes all resources + for (int i = 0; i < 6; i ++) { + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + } + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 6; i++) { + scheduler.handle(nodeUpdate1); + } + + // Now new requests arrive from queues B1, B2 and C + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + scheduler.update(); + + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true); + FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime()))); + + // After 5 seconds, queueB1 wants to preempt min share + scheduler.update(); + clock.tickSec(6); + assertEquals( + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + + // After 10 seconds, queueB2 wants to preempt min share + scheduler.update(); + clock.tickSec(5); + assertEquals( + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + + // After 15 seconds, queueC wants to preempt min share + scheduler.update(); + clock.tickSec(5); + assertEquals( + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + + // After 20 seconds, queueB2 should want to preempt fair share + scheduler.update(); + clock.tickSec(5); + assertEquals( + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + + // After 25 seconds, queueB1 should want to preempt fair share + scheduler.update(); + clock.tickSec(5); + assertEquals( + 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + + // After 30 seconds, queueC should want to preempt fair share + scheduler.update(); + clock.tickSec(5); + assertEquals( + 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); + assertEquals( + 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); + stopResourceManager(); + } + + @Test + /** + * Tests the decision to preempt tasks respect to non-preemptable queues + * 1, Queues as follow: + * queueA(non-preemptable) + * queueB(preemptable) + * parentQueue(non-preemptable) + * --queueC(preemptable) + * queueD(preemptable) + * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare + * 3, Now all resource are occupied + * 4, Submit request to queueD, and need to preempt resource from other queues + * 5, Only preemptable queue(queueB) would be preempted. + */ + public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("<maxResources>0mb,0vcores</maxResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"parentQueue\">"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>2048mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes(3G each) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Submit apps to queueA, queueB, queueC, + // now all resource of the cluster is occupied + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // Now new requests arrive from queues D + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1); + scheduler.update(); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + // After minSharePreemptionTime has passed, 2G resource should preempted from + // queueB to queueD + clock.tickSec(6); + assertEquals(2048, + scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + + scheduler.preemptResources(Resources.createResource(2 * 1024)); + // now only app2 is selected to be preempted + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App1 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App3 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + // Pretend 20 seconds have passed + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + stopResourceManager(); + } + + @Test + /** + * Tests the decision to preempt tasks when allowPreemptionFrom is set false on + * all queues. + * Then none of them would be preempted actually. + * 1, Queues as follow: + * queueA(non-preemptable) + * queueB(non-preemptable) + * parentQueue(non-preemptable) + * --queueC(preemptable) + * parentQueue(preemptable) + * --queueD(non-preemptable) + * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare + * 3, Now all resource are occupied + * 4, Submit request to queueA, and need to preempt resource from other queues + * 5, None of queues would be preempted. + */ + public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues() + throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("<maxResources>0mb,0vcores</maxResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>2048mb,0vcores</minResources>"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("</queue>"); + out.println("<queue name=\"parentQueue1\">"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"parentQueue2\">"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("<allowPreemptionFrom>false</allowPreemptionFrom>"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes(3G each) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Submit apps to queueB, queueC, queueD + // now all resource of the cluster is occupied + + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // Now new requests arrive from queues A + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + scheduler.update(); + FSLeafQueue schedA = + scheduler.getQueueManager().getLeafQueue("queueA", true); + + // After minSharePreemptionTime has passed, resource deficit is 2G + clock.tickSec(6); + assertEquals(2048, + scheduler.resourceDeficit(schedA, clock.getTime()).getMemory()); + + scheduler.preemptResources(Resources.createResource(2 * 1024)); + // now none app is selected to be preempted + assertTrue("App1 should have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App2 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App3 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + // Pretend 20 seconds have passed + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + stopResourceManager(); + } + + @Test + public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<queue name=\"queueB1\">"); + out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("<queue name=\"queueB2\">"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("</queue>"); + out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>"); + out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>"); + out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // If both exist, we take the default one + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<queue name=\"queueB1\">"); + out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("<queue name=\"queueB2\">"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("</queue>"); + out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>"); + out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>"); + out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + assertEquals(25000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + stopResourceManager(); + } + + @Test(timeout = 5000) + public void testRecoverRequestAfterPreemption() throws Exception { + startResourceManagerWithRealFairScheduler(); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tickSec(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + // Trigger container rescheduled event + scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, + SchedulerEventType.KILL_PREEMPTED_CONTAINER)); + + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List<Container> containers = scheduler.allocate(appAttemptId, + Collections.<ResourceRequest> emptyList(), + Collections.<ContainerId> emptyList(), null, null, null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + stopResourceManager(); + } }
