http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java new file mode 100644 index 0000000..6dcd41f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -0,0 +1,477 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestInMemoryPlan { + + private String user = "yarn"; + private String planName = "test-reservation"; + private ResourceCalculator resCalc; + private Resource minAlloc; + private Resource maxAlloc; + private Resource totalCapacity; + + private Clock clock; + private QueueMetrics queueMetrics; + private SharingPolicy policy; + private ReservationAgent agent; + private Planner replanner; + + @Before + public void setUp() throws PlanningException { + resCalc = new DefaultResourceCalculator(); + minAlloc = Resource.newInstance(1024, 1); + maxAlloc = Resource.newInstance(64 * 1024, 20); + totalCapacity = Resource.newInstance(100 * 1024, 100); + + clock = mock(Clock.class); + queueMetrics = mock(QueueMetrics.class); + policy = mock(SharingPolicy.class); + replanner = mock(Planner.class); + + when(clock.getTime()).thenReturn(1L); + } + + @After + public void tearDown() { + resCalc = null; + minAlloc = null; + maxAlloc = null; + totalCapacity = null; + + clock = null; + queueMetrics = null; + policy = null; + replanner = null; + } + + @Test + public void testAddReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testAddEmptyReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = {}; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + new HashMap<ReservationInterval, ReservationRequest>(); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testAddReservationAlreadyExists() { + // First add a reservation + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Try to add it again + try { + plan.addReservation(rAllocation); + Assert.fail("Add should fail as it already exists"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("already exists")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + } + + @Test + public void testUpdateReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // First add a reservation + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Now update it + start = 110; + int[] updatedAlloc = { 0, 5, 10, 10, 5, 0 }; + allocations = generateAllocation(start, updatedAlloc, true); + rDef = + createSimpleReservationDefinition(start, start + updatedAlloc.length, + updatedAlloc.length, allocations.values()); + rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + updatedAlloc.length, allocations, resCalc, minAlloc); + try { + plan.updateReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < updatedAlloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + + i), plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (updatedAlloc[i] + i), updatedAlloc[i] + + i), plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testUpdateNonExistingReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // Try to update a reservation without adding + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.updateReservation(rAllocation); + Assert.fail("Update should fail as it does not exist in the plan"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testDeleteReservation() { + // First add a reservation + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, true); + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length, + alloc.length, allocations.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length, allocations, resCalc, minAlloc); + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + plan.getConsumptionForUser(user, start + i)); + } + + // Now delete it + try { + plan.deleteReservation(reservationID); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getConsumptionForUser(user, start + i)); + } + } + + @Test + public void testDeleteNonExistingReservation() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID = + ReservationSystemTestUtil.getNewReservationId(); + // Try to delete a reservation without adding + Assert.assertNull(plan.getReservationById(reservationID)); + try { + plan.deleteReservation(reservationID); + Assert.fail("Delete should fail as it does not exist in the plan"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().endsWith("does not exist in the plan")); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testArchiveCompletedReservations() { + Plan plan = + new InMemoryPlan(queueMetrics, policy, agent, totalCapacity, 1L, + resCalc, minAlloc, maxAlloc, planName, replanner, true); + ReservationId reservationID1 = + ReservationSystemTestUtil.getNewReservationId(); + // First add a reservation + int[] alloc1 = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Map<ReservationInterval, ReservationRequest> allocations1 = + generateAllocation(start, alloc1, false); + ReservationDefinition rDef1 = + createSimpleReservationDefinition(start, start + alloc1.length, + alloc1.length, allocations1.values()); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID1, rDef1, user, + planName, start, start + alloc1.length, allocations1, resCalc, + minAlloc); + Assert.assertNull(plan.getReservationById(reservationID1)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + doAssertions(plan, rAllocation); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getConsumptionForUser(user, start + i)); + } + + // Now add another one + ReservationId reservationID2 = + ReservationSystemTestUtil.getNewReservationId(); + int[] alloc2 = { 0, 5, 10, 5, 0 }; + Map<ReservationInterval, ReservationRequest> allocations2 = + generateAllocation(start, alloc2, true); + ReservationDefinition rDef2 = + createSimpleReservationDefinition(start, start + alloc2.length, + alloc2.length, allocations2.values()); + rAllocation = + new InMemoryReservationAllocation(reservationID2, rDef2, user, + planName, start, start + alloc2.length, allocations2, resCalc, + minAlloc); + Assert.assertNull(plan.getReservationById(reservationID2)); + try { + plan.addReservation(rAllocation); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(plan.getReservationById(reservationID2)); + for (int i = 0; i < alloc2.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] + + alloc2[i] + i), plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i] + alloc2[i] + i), alloc1[i] + + alloc2[i] + i), plan.getConsumptionForUser(user, start + i)); + } + + // Now archive completed reservations + when(clock.getTime()).thenReturn(106L); + when(policy.getValidWindow()).thenReturn(1L); + try { + // will only remove 2nd reservation as only that has fallen out of the + // archival window + plan.archiveCompletedReservations(clock.getTime()); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(plan.getReservationById(reservationID1)); + Assert.assertNull(plan.getReservationById(reservationID2)); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals( + Resource.newInstance(1024 * (alloc1[i]), (alloc1[i])), + plan.getConsumptionForUser(user, start + i)); + } + when(clock.getTime()).thenReturn(107L); + try { + // will remove 1st reservation also as it has fallen out of the archival + // window + plan.archiveCompletedReservations(clock.getTime()); + } catch (PlanningException e) { + Assert.fail(e.getMessage()); + } + Assert.assertNull(plan.getReservationById(reservationID1)); + for (int i = 0; i < alloc1.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getTotalCommittedResources(start + i)); + Assert.assertEquals(Resource.newInstance(0, 0), + plan.getConsumptionForUser(user, start + i)); + } + } + + private void doAssertions(Plan plan, ReservationAllocation rAllocation) { + ReservationId reservationID = rAllocation.getReservationId(); + Assert.assertNotNull(plan.getReservationById(reservationID)); + Assert.assertEquals(rAllocation, plan.getReservationById(reservationID)); + Assert.assertTrue(((InMemoryPlan) plan).getAllReservations().size() == 1); + Assert.assertEquals(rAllocation.getEndTime(), plan.getLastEndTime()); + Assert.assertEquals(totalCapacity, plan.getTotalCapacity()); + Assert.assertEquals(minAlloc, plan.getMinimumAllocation()); + Assert.assertEquals(maxAlloc, plan.getMaximumAllocation()); + Assert.assertEquals(resCalc, plan.getResourceCalculator()); + Assert.assertEquals(planName, plan.getQueueName()); + Assert.assertTrue(plan.getMoveOnExpiry()); + } + + private ReservationDefinition createSimpleReservationDefinition(long arrival, + long deadline, long duration, Collection<ReservationRequest> resources) { + // create a request with a single atomic ask + ReservationDefinition rDef = new ReservationDefinitionPBImpl(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(new ArrayList<ReservationRequest>(resources)); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + rDef.setReservationRequests(reqs); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + return rDef; + } + + private Map<ReservationInterval, ReservationRequest> generateAllocation( + int startTime, int[] alloc, boolean isStep) { + Map<ReservationInterval, ReservationRequest> req = + new HashMap<ReservationInterval, ReservationRequest>(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + ReservationRequest rr = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers)); + req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr); + } + return req; + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java new file mode 100644 index 0000000..f4c4581 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java @@ -0,0 +1,206 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestInMemoryReservationAllocation { + + private String user = "yarn"; + private String planName = "test-reservation"; + private ResourceCalculator resCalc; + private Resource minAlloc; + + private Random rand = new Random(); + + @Before + public void setUp() { + resCalc = new DefaultResourceCalculator(); + minAlloc = Resource.newInstance(1, 1); + } + + @After + public void tearDown() { + user = null; + planName = null; + resCalc = null; + minAlloc = null; + } + + @Test + public void testBlocks() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testSteps() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, true, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testSkyline() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 0, 5, 10, 10, 5, 0 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, true, false); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertFalse(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rAllocation.getResourcesAtTime(start + i)); + } + } + + @Test + public void testZeroAlloaction() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = {}; + long start = 0; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map<ReservationInterval, ReservationRequest> allocations = + new HashMap<ReservationInterval, ReservationRequest>(); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, (int) start, + alloc); + Assert.assertFalse(rAllocation.containsGangs()); + } + + @Test + public void testGangAlloaction() { + ReservationId reservationID = + ReservationId.newInstance(rand.nextLong(), rand.nextLong()); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + ReservationDefinition rDef = + createSimpleReservationDefinition(start, start + alloc.length + 1, + alloc.length); + Map<ReservationInterval, ReservationRequest> allocations = + generateAllocation(start, alloc, false, true); + ReservationAllocation rAllocation = + new InMemoryReservationAllocation(reservationID, rDef, user, planName, + start, start + alloc.length + 1, allocations, resCalc, minAlloc); + doAssertions(rAllocation, reservationID, rDef, allocations, start, alloc); + Assert.assertTrue(rAllocation.containsGangs()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rAllocation.getResourcesAtTime(start + i)); + } + } + + private void doAssertions(ReservationAllocation rAllocation, + ReservationId reservationID, ReservationDefinition rDef, + Map<ReservationInterval, ReservationRequest> allocations, int start, + int[] alloc) { + Assert.assertEquals(reservationID, rAllocation.getReservationId()); + Assert.assertEquals(rDef, rAllocation.getReservationDefinition()); + Assert.assertEquals(allocations, rAllocation.getAllocationRequests()); + Assert.assertEquals(user, rAllocation.getUser()); + Assert.assertEquals(planName, rAllocation.getPlanName()); + Assert.assertEquals(start, rAllocation.getStartTime()); + Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime()); + } + + private ReservationDefinition createSimpleReservationDefinition(long arrival, + long deadline, long duration) { + // create a request with a single atomic ask + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1, + duration); + ReservationDefinition rDef = new ReservationDefinitionPBImpl(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + rDef.setReservationRequests(reqs); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + return rDef; + } + + private Map<ReservationInterval, ReservationRequest> generateAllocation( + int startTime, int[] alloc, boolean isStep, boolean isGang) { + Map<ReservationInterval, ReservationRequest> req = + new HashMap<ReservationInterval, ReservationRequest>(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + ReservationRequest rr = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers)); + if (isGang) { + rr.setConcurrency(numContainers); + } + req.put(new ReservationInterval(startTime + i, startTime + i + 1), rr); + } + return req; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8b2cd8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java new file mode 100644 index 0000000..ab0de6b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestRLESparseResourceAllocation.java @@ -0,0 +1,169 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestRLESparseResourceAllocation { + + private static final Logger LOG = LoggerFactory + .getLogger(TestRLESparseResourceAllocation.class); + + @Test + public void testBlocks() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Set<Entry<ReservationInterval, ReservationRequest>> inputs = + generateAllocation(start, alloc, false).entrySet(); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(1024 * (alloc[i]), (alloc[i])), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testSteps() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 10, 10, 10, 10, 10, 10 }; + int start = 100; + Set<Entry<ReservationInterval, ReservationRequest>> inputs = + generateAllocation(start, alloc, true).entrySet(); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testSkyline() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + int[] alloc = { 0, 5, 10, 10, 5, 0 }; + int start = 100; + Set<Entry<ReservationInterval, ReservationRequest>> inputs = + generateAllocation(start, alloc, true).entrySet(); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.addInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + Assert.assertFalse(rleSparseVector.isEmpty()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(99)); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 1)); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals( + Resource.newInstance(1024 * (alloc[i] + i), (alloc[i] + i)), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + alloc.length + 2)); + for (Entry<ReservationInterval, ReservationRequest> ip : inputs) { + rleSparseVector.removeInterval(ip.getKey(), ip.getValue()); + } + LOG.info(rleSparseVector.toString()); + for (int i = 0; i < alloc.length; i++) { + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(start + i)); + } + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + @Test + public void testZeroAlloaction() { + ResourceCalculator resCalc = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1, 1); + RLESparseResourceAllocation rleSparseVector = + new RLESparseResourceAllocation(resCalc, minAlloc); + rleSparseVector.addInterval(new ReservationInterval(0, Long.MAX_VALUE), + ReservationRequest.newInstance(Resource.newInstance(0, 0), (0))); + LOG.info(rleSparseVector.toString()); + Assert.assertEquals(Resource.newInstance(0, 0), + rleSparseVector.getCapacityAtTime(new Random().nextLong())); + Assert.assertTrue(rleSparseVector.isEmpty()); + } + + private Map<ReservationInterval, ReservationRequest> generateAllocation( + int startTime, int[] alloc, boolean isStep) { + Map<ReservationInterval, ReservationRequest> req = + new HashMap<ReservationInterval, ReservationRequest>(); + int numContainers = 0; + for (int i = 0; i < alloc.length; i++) { + if (isStep) { + numContainers = alloc[i] + i; + } else { + numContainers = alloc[i]; + } + req.put(new ReservationInterval(startTime + i, startTime + i + 1), + + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + (numContainers))); + } + return req; + } + +}
