http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java new file mode 100644 index 0000000..bd18a2f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -0,0 +1,611 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.CapacityOverTimePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +public class TestGreedyReservationAgent { + + ReservationAgent agent; + InMemoryPlan plan; + Resource minAlloc = Resource.newInstance(1024, 1); + ResourceCalculator res = new DefaultResourceCalculator(); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + Random rand = new Random(); + long step; + + @Before + public void setup() throws Exception { + + long seed = rand.nextLong(); + rand.setSeed(seed); + Log.info("Running with seed: " + seed); + + // setting completely loose quotas + long timeWindow = 1000000L; + Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); + step = 1000L; + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + String reservationQ = testUtil.getFullReservationQueueName(); + + float instConstraint = 100; + float avgConstraint = 100; + + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + agent = new GreedyReservationAgent(); + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + + plan = new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", null, true); + } + + @SuppressWarnings("javadoc") + @Test + public void testSimple() throws PlanningException { + + prepareBasicPlan(); + + // create a request with a single atomic ask + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(5 * step); + rr.setDeadline(20 * step); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 5, 10 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + System.out.println("--------AFTER SIMPLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + for (long i = 10 * step; i < 20 * step; i++) { + assertTrue( + "Agent-based allocation unexpected", + Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(2048 * 10, 2 * 10))); + } + + } + + @Test + public void testOrder() throws PlanningException { + prepareBasicPlan(); + + // create a completely utilized segment around time 30 + int[] f = { 100, 100 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 30 * step, 30 * step + f.length * step, + ReservationSystemTestUtil.generateAllocation(30 * step, step, f), + res, minAlloc))); + + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0 * step); + rr.setDeadline(70 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20 * step); + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 4); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 40 * step, 50 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 50 * step, 70 * step, 10, 1024, 1)); + + System.out.println("--------AFTER ORDER ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testOrderNoGapImpossible() throws PlanningException { + prepareBasicPlan(); + // create a completely utilized segment at time 30 + int[] f = { 100, 100 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 30 * step, 30 * step + f.length * step, + ReservationSystemTestUtil.generateAllocation(30 * step, step, f), + res, minAlloc))); + + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0L); + + rr.setDeadline(70L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20); + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + + // validate + assertFalse("Agent-based allocation should have failed", result); + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 3); + + System.out + .println("--------AFTER ORDER_NO_GAP IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testOrderNoGap() throws PlanningException { + prepareBasicPlan(); + // create a chain of 4 RR, mixing gang and non-gang + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(0 * step); + rr.setDeadline(60 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 1, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 10, 10, 20 * step); + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + System.out.println("--------AFTER ORDER ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + // validate + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 0 * step, 10 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 10 * step, 30 * step, 10, 1024, 1)); + assertTrue(cs.toString(), check(cs, 30 * step, 40 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 40 * step, 60 * step, 10, 1024, 1)); + + } + + @Test + public void testSingleSliding() throws PlanningException { + prepareBasicPlan(); + + // create a single request for which we need subsequent (tight) packing. + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 200, 10, 10 * step); + + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 100 * step, 120 * step, 100, 1024, 1)); + + System.out.println("--------AFTER packed ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAny() throws PlanningException { + prepareBasicPlan(); + // create an ANY request, with an impossible step (last in list, first + // considered), + // and two satisfiable ones. We expect the second one to be returned. + + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ANY); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 5, 5, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 5, 10 * step); + ReservationRequest r3 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 110, 110, 10 * step); + + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + list.add(r3); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean res = agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", res); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 20, 1024, 1)); + + System.out.println("--------AFTER ANY ALLOCATION (queue: " + reservationID + + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAnyImpossible() throws PlanningException { + prepareBasicPlan(); + // create an ANY request, with all impossible alternatives + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100L); + rr.setDeadline(120L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ANY); + + // longer than arrival-deadline + ReservationRequest r1 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 35, 5, 30); + // above max cluster size + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 110, 110, 10); + + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r1); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + // validate results, we expect the second one to be accepted + assertFalse("Agent-based allocation should have failed", result); + assertTrue("Agent-based allocation should have failed", plan + .getAllReservations().size() == 2); + + System.out.println("--------AFTER ANY IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAll() throws PlanningException { + prepareBasicPlan(); + // create an ALL request + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100 * step); + rr.setDeadline(120 * step); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 5, 5, 10 * step); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 10, 10, 20 * step); + + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + // submit to agent + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + agent.createReservation(reservationID, "u1", plan, rr); + + // validate results, we expect the second one to be accepted + assertTrue("Agent-based allocation failed", reservationID != null); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 3); + + ReservationAllocation cs = plan.getReservationById(reservationID); + + assertTrue(cs.toString(), check(cs, 100 * step, 110 * step, 20, 1024, 1)); + assertTrue(cs.toString(), check(cs, 110 * step, 120 * step, 25, 1024, 1)); + + System.out.println("--------AFTER ALL ALLOCATION (queue: " + reservationID + + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + @Test + public void testAllImpossible() throws PlanningException { + prepareBasicPlan(); + // create an ALL request, with an impossible combination, it should be + // rejected, and allocation remain unchanged + ReservationDefinition rr = new ReservationDefinitionPBImpl(); + rr.setArrival(100L); + rr.setDeadline(120L); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + ReservationRequest r = ReservationRequest.newInstance( + Resource.newInstance(1024, 1), 55, 5, 10); + ReservationRequest r2 = ReservationRequest.newInstance( + Resource.newInstance(2048, 2), 55, 5, 20); + + List<ReservationRequest> list = new ArrayList<ReservationRequest>(); + list.add(r); + list.add(r2); + reqs.setReservationResources(list); + rr.setReservationRequests(reqs); + + ReservationId reservationID = ReservationSystemTestUtil + .getNewReservationId(); + boolean result = false; + try { + // submit to agent + result = agent.createReservation(reservationID, "u1", plan, rr); + fail(); + } catch (PlanningException p) { + // expected + } + + // validate results, we expect the second one to be accepted + assertFalse("Agent-based allocation failed", result); + assertTrue("Agent-based allocation failed", plan.getAllReservations() + .size() == 2); + + System.out.println("--------AFTER ALL IMPOSSIBLE ALLOCATION (queue: " + + reservationID + ")----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + + } + + private void prepareBasicPlan() throws PlanningException { + + // insert in the reservation a couple of controlled reservations, to create + // conditions for assignment that are non-empty + + int[] f = { 10, 10, 20, 20, 20, 10, 10 }; + + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil + .generateAllocation(0, step, f), res, minAlloc))); + + int[] f2 = { 5, 5, 5, 5, 5, 5, 5 }; + Map<ReservationInterval, Resource> alloc = + ReservationSystemTestUtil.generateAllocation(5000, step, f2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation( + ReservationSystemTestUtil.getNewReservationId(), null, "u1", + "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc))); + + System.out.println("--------BEFORE AGENT----------"); + System.out.println(plan.toString()); + System.out.println(plan.toCumulativeString()); + } + + private boolean check(ReservationAllocation cs, long start, long end, + int containers, int mem, int cores) { + + boolean res = true; + for (long i = start; i < end; i++) { + res = res + && Resources.equals(cs.getResourcesAtTime(i), + Resource.newInstance(mem * containers, cores * containers)); + } + return res; + } + + public void testStress(int numJobs) throws PlanningException, IOException { + + long timeWindow = 1000000L; + Resource clusterCapacity = Resource.newInstance(500 * 100 * 1024, 500 * 32); + step = 1000L; + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); + String reservationQ = testUtil.getFullReservationQueueName(); + float instConstraint = 100; + float avgConstraint = 100; + ReservationSchedulerConfiguration conf = + ReservationSystemTestUtil.createConf(reservationQ, timeWindow, + instConstraint, avgConstraint); + CapacityOverTimePolicy policy = new CapacityOverTimePolicy(); + policy.init(reservationQ, conf); + + plan = new InMemoryPlan(scheduler.getRootQueueMetrics(), policy, agent, + clusterCapacity, step, res, minAlloc, maxAlloc, "dedicated", null, true); + + int acc = 0; + List<ReservationDefinition> list = new ArrayList<ReservationDefinition>(); + for (long i = 0; i < numJobs; i++) { + list.add(ReservationSystemTestUtil.generateRandomRR(rand, i)); + } + + long start = System.currentTimeMillis(); + for (int i = 0; i < numJobs; i++) { + + try { + if (agent.createReservation( + ReservationSystemTestUtil.getNewReservationId(), "u" + i % 100, + plan, list.get(i))) { + acc++; + } + } catch (PlanningException p) { + // ignore exceptions + } + } + + long end = System.currentTimeMillis(); + System.out.println("Submitted " + numJobs + " jobs " + " accepted " + acc + + " in " + (end - start) + "ms"); + } + + public static void main(String[] arg) { + + // run a stress test with by default 1000 random jobs + int numJobs = 1000; + if (arg.length > 0) { + numJobs = Integer.parseInt(arg[0]); + } + + try { + TestGreedyReservationAgent test = new TestGreedyReservationAgent(); + test.setup(); + test.testStress(numJobs); + } catch (Exception e) { + e.printStackTrace(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java new file mode 100644 index 0000000..aeb1e6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java @@ -0,0 +1,170 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryPlan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.NoOverCommitPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.SharingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Test; + +public class TestSimpleCapacityReplanner { + + @Test + public void testReplanningPlanCapacityLoss() throws PlanningException { + + Resource clusterCapacity = Resource.newInstance(100 * 1024, 10); + Resource minAlloc = Resource.newInstance(1024, 1); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + + ResourceCalculator res = new DefaultResourceCalculator(); + long step = 1L; + Clock clock = mock(Clock.class); + ReservationAgent agent = mock(ReservationAgent.class); + + SharingPolicy policy = new NoOverCommitPolicy(); + policy.init("root.dedicated", null); + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + + when(clock.getTime()).thenReturn(0L); + SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); + + ReservationSchedulerConfiguration conf = + mock(ReservationSchedulerConfiguration.class); + when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); + + enf.init("blah", conf); + + // Initialize the plan with more resources + InMemoryPlan plan = + new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", enf, true, clock); + + // add reservation filling the plan (separating them 1ms, so we are sure + // s2 follows s1 on acceptance + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int[] f5 = { 20, 20, 20, 20, 20 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(1L); + ReservationId r2 = ReservationId.newInstance(ts, 2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(2L); + ReservationId r3 = ReservationId.newInstance(ts, 3); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(3L); + ReservationId r4 = ReservationId.newInstance(ts, 4); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(4L); + ReservationId r5 = ReservationId.newInstance(ts, 5); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + + int[] f6 = { 50, 50, 50, 50, 50 }; + ReservationId r6 = ReservationId.newInstance(ts, 6); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3", + "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, + minAlloc))); + when(clock.getTime()).thenReturn(6L); + ReservationId r7 = ReservationId.newInstance(ts, 7); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4", + "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, + minAlloc))); + + // remove some of the resources (requires replanning) + plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70)); + + when(clock.getTime()).thenReturn(0L); + + // run the replanner + enf.plan(plan, null); + + // check which reservation are still present + assertNotNull(plan.getReservationById(r1)); + assertNotNull(plan.getReservationById(r2)); + assertNotNull(plan.getReservationById(r3)); + assertNotNull(plan.getReservationById(r6)); + assertNotNull(plan.getReservationById(r7)); + + // and which ones are removed + assertNull(plan.getReservationById(r4)); + assertNull(plan.getReservationById(r5)); + + // check resources at each moment in time no more exceed capacity + for (int i = 0; i < 20; i++) { + int tot = 0; + for (ReservationAllocation r : plan.getReservationsAtTime(i)) { + tot = r.getResourcesAtTime(i).getMemory(); + } + assertTrue(tot <= 70 * 1024); + } + } + + private Map<ReservationInterval, Resource> generateAllocation( + int startTime, int[] alloc) { + Map<ReservationInterval, Resource> req = + new TreeMap<ReservationInterval, Resource>(); + for (int i = 0; i < alloc.length; i++) { + req.put(new ReservationInterval(startTime + i, startTime + i + 1), + ReservationSystemUtil.toResource( + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + alloc[i]))); + } + return req; + } + +}