Repository: hadoop Updated Branches: refs/heads/branch-2.6 b91715bc8 -> 58a6142c1
YARN-3849. Too much of preemption activity causing continuos killing of containers across queues. (Sunil G via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/58a6142c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/58a6142c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/58a6142c Branch: refs/heads/branch-2.6 Commit: 58a6142c1455cb622ffc7d520df30d8f6dc5680b Parents: b91715b Author: Rohith Sharma K S <[email protected]> Authored: Mon Jan 11 12:02:38 2016 +0530 Committer: Rohith Sharma K S <[email protected]> Committed: Mon Jan 11 12:02:38 2016 +0530 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 2 + .../ProportionalCapacityPreemptionPolicy.java | 3 +- ...estProportionalCapacityPreemptionPolicy.java | 198 +++++++++++++++---- 3 files changed, 165 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e46f2f7..8f35efc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -43,6 +43,8 @@ Release 2.6.4 - UNRELEASED YARN-4180. AMLauncher does not retry on failures when talking to NM. (adhoot) + YARN-3849. Too much of preemption activity causing continuos killing of + containers across queues. (Sunil G via wangda) Release 2.6.3 - 2015-12-17 http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 0f48b0c..854d2c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -632,11 +632,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic TempQueue ret; synchronized (root) { String queueName = root.getQueueName(); - float absUsed = root.getAbsoluteUsedCapacity(); float absCap = root.getAbsoluteCapacity(); float absMaxCap = root.getAbsoluteMaximumCapacity(); - Resource current = Resources.multiply(clusterResources, absUsed); + Resource current = root.getQueueResourceUsage().getUsed(); Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); if (root instanceof LeafQueue) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/58a6142c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 24e70bb..bca2def 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @@ -67,7 +68,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -382,7 +385,7 @@ public class TestProportionalCapacityPreemptionPolicy { // we verify both that C has priority on B and D (has it has >0 guarantees) // and that B and D are force to share their over capacity fairly (as they // are both zero-guarantees) hence D sees some of its containers preempted - verify(mDisp, times(14)).handle(argThat(new IsPreemptionRequestFor(appC))); + verify(mDisp, times(15)).handle(argThat(new IsPreemptionRequestFor(appC))); } @@ -407,8 +410,8 @@ public class TestProportionalCapacityPreemptionPolicy { // XXX note: compensating for rounding error in Resources.multiplyTo // which is likely triggered since we use small numbers for readability - verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); - verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE))); + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE))); } @Test @@ -571,7 +574,35 @@ public class TestProportionalCapacityPreemptionPolicy { verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA))); setAMContainer = false; } - + + @Test + public void testPreemptionWithVCoreResource() { + int[][] qData = new int[][] { + // / A B + { 100, 100, 100 }, // maxcap + { 5, 1, 1 }, // apps + { 2, 0, 0 }, // subqueues + }; + + // Resources can be set like memory:vcores + String[][] resData = new String[][] { + // / A B + { "100:100", "50:50", "50:50" },// abs + { "10:100", "10:100", "0" }, // used + { "70:20", "70:20", "10:100" }, // pending + { "0", "0", "0" }, // reserved + { "-1", "1:10", "1:10" }, // req granularity + }; + + // Passing last param as TRUE to use DominantResourceCalculator + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData, resData, + true); + policy.editSchedule(); + + // 5 containers will be preempted here + verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + static class IsPreemptionRequestFor extends ArgumentMatcher<ContainerPreemptEvent> { private final ApplicationAttemptId appAttId; @@ -598,37 +629,103 @@ public class TestProportionalCapacityPreemptionPolicy { ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) { ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + Resource clusterResources = + Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); ParentQueue mRoot = buildMockRootQueue(rand, qData); when(mCS.getRootQueue()).thenReturn(mRoot); - Resource clusterResources = - Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0); when(mCS.getClusterResource()).thenReturn(clusterResources); return policy; } + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData) { + return buildPolicy(qData, resData, false); + } + + ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData, + String[][] resData, boolean useDominantResourceCalculator) { + if (useDominantResourceCalculator) { + when(mCS.getResourceCalculator()).thenReturn( + new DominantResourceCalculator()); + } + ProportionalCapacityPreemptionPolicy policy = + new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock); + Resource clusterResources = leafAbsCapacities( + parseResourceDetails(resData[0]), qData[2]); + when(mCS.getClusterResource()).thenReturn(clusterResources); + ParentQueue mRoot = buildMockRootQueue(rand, resData, qData); + when(mCS.getRootQueue()).thenReturn(mRoot); + + return policy; + } + ParentQueue buildMockRootQueue(Random r, int[]... queueData) { - int[] abs = queueData[0]; - int[] maxCap = queueData[1]; - int[] used = queueData[2]; - int[] pending = queueData[3]; - int[] reserved = queueData[4]; - int[] apps = queueData[5]; - int[] gran = queueData[6]; - int[] queues = queueData[7]; + Resource[] abs = generateResourceList(queueData[0]); + Resource[] used = generateResourceList(queueData[2]); + Resource[] pending = generateResourceList(queueData[3]); + Resource[] reserved = generateResourceList(queueData[4]); + Resource[] gran = generateResourceList(queueData[6]); + int[] maxCap = queueData[1]; + int[] apps = queueData[5]; + int[] queues = queueData[7]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + } + + private ParentQueue buildMockRootQueue(Random rand2, String[][] resData, + int[][] queueData) { + Resource[] abs = parseResourceDetails(resData[0]); + Resource[] used = parseResourceDetails(resData[1]); + Resource[] pending = parseResourceDetails(resData[2]); + Resource[] reserved = parseResourceDetails(resData[3]); + Resource[] gran = parseResourceDetails(resData[4]); + int[] maxCap = queueData[0]; + int[] apps = queueData[1]; + int[] queues = queueData[2]; + + return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + } + + Resource[] parseResourceDetails(String[] resData) { + List<Resource> resourceList = new ArrayList<Resource>(); + for (int i = 0; i < resData.length; i++) { + String[] resource = resData[i].split(":"); + if (resource.length == 1) { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), 0)); + } else { + resourceList.add(Resource.newInstance(Integer.valueOf(resource[0]), + Integer.valueOf(resource[1]))); + } + } + return resourceList.toArray(new Resource[resourceList.size()]); + } - return mockNested(abs, maxCap, used, pending, reserved, apps, gran, queues); + Resource[] generateResourceList(int[] qData) { + List<Resource> resourceList = new ArrayList<Resource>(); + for (int i = 0; i < qData.length; i++) { + resourceList.add(Resource.newInstance(qData[i], 0)); + } + return resourceList.toArray(new Resource[resourceList.size()]); } - ParentQueue mockNested(int[] abs, int[] maxCap, int[] used, - int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) { - float tot = leafAbsCapacities(abs, queues); + ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used, + Resource[] pending, Resource[] reserved, int[] apps, Resource[] gran, + int[] queues) { + ResourceCalculator rc = mCS.getResourceCalculator(); + Resource tot = leafAbsCapacities(abs, queues); Deque<ParentQueue> pqs = new LinkedList<ParentQueue>(); ParentQueue root = mockParentQueue(null, queues[0], pqs); + ResourceUsage resUsage = new ResourceUsage(); + resUsage.setUsed(used[0]); when(root.getQueueName()).thenReturn("/"); - when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); - when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); - when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); + when(root.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[0], tot)); + when(root.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[0], tot)); + when(root.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[0] / (float) tot.getMemory()); + when(root.getQueueResourceUsage()).thenReturn(resUsage); for (int i = 1; i < queues.length; ++i) { final CSQueue q; @@ -636,14 +733,20 @@ public class TestProportionalCapacityPreemptionPolicy { final String queueName = "queue" + ((char)('A' + i - 1)); if (queues[i] > 0) { q = mockParentQueue(p, queues[i], pqs); + ResourceUsage resUsagePerQueue = new ResourceUsage(); + resUsagePerQueue.setUsed(used[i]); + when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue); } else { q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran); } when(q.getParent()).thenReturn(p); when(q.getQueueName()).thenReturn(queueName); - when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); - when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); - when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); + when(q.getAbsoluteUsedCapacity()).thenReturn( + Resources.divide(rc, tot, used[i], tot)); + when(q.getAbsoluteCapacity()).thenReturn( + Resources.divide(rc, tot, abs[i], tot)); + when(q.getAbsoluteMaximumCapacity()).thenReturn( + maxCap[i] / (float) tot.getMemory()); } assert 0 == pqs.size(); return root; @@ -663,11 +766,17 @@ public class TestProportionalCapacityPreemptionPolicy { return pq; } - LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, - int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { + LeafQueue mockLeafQueue(ParentQueue p, Resource tot, int i, Resource[] abs, + Resource[] used, Resource[] pending, Resource[] reserved, int[] apps, + Resource[] gran) { LeafQueue lq = mock(LeafQueue.class); - when(lq.getTotalResourcePending()).thenReturn( - Resource.newInstance(pending[i], 0)); + ResourceCalculator rc = mCS.getResourceCalculator(); + when(lq.getTotalResourcePending()).thenReturn(pending[i]); + // need to set pending resource in resource usage as well + ResourceUsage ru = new ResourceUsage(); + ru.setPending(pending[i]); + ru.setUsed(used[i]); + when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>( new Comparator<FiCaSchedulerApp>() { @@ -679,9 +788,9 @@ public class TestProportionalCapacityPreemptionPolicy { }); // applications are added in global L->R order in queues if (apps[i] != 0) { - int aUsed = used[i] / apps[i]; - int aPending = pending[i] / apps[i]; - int aReserve = reserved[i] / apps[i]; + Resource aUsed = Resources.divideAndCeil(rc, used[i], apps[i]); + Resource aPending = Resources.divideAndCeil(rc, pending[i], apps[i]); + Resource aReserve = Resources.divideAndCeil(rc, reserved[i], apps[i]); for (int a = 0; a < apps[i]; ++a) { qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); ++appAlloc; @@ -695,9 +804,10 @@ public class TestProportionalCapacityPreemptionPolicy { return lq; } - FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved, - int gran) { + FiCaSchedulerApp mockApp(int qid, int id, Resource used, Resource pending, + Resource reserved, Resource gran) { FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + ResourceCalculator rc = mCS.getResourceCalculator(); ApplicationId appId = ApplicationId.newInstance(TS, id); ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0); @@ -705,22 +815,28 @@ public class TestProportionalCapacityPreemptionPolicy { when(app.getApplicationAttemptId()).thenReturn(appAttId); int cAlloc = 0; - Resource unit = Resource.newInstance(gran, 0); + Resource unit = gran; List<RMContainer> cReserved = new ArrayList<RMContainer>(); - for (int i = 0; i < reserved; i += gran) { + Resource resIter = Resource.newInstance(0, 0); + for (; Resources.lessThan(rc, mCS.getClusterResource(), resIter, reserved); Resources + .addTo(resIter, gran)) { cReserved.add(mockContainer(appAttId, cAlloc, unit, 1)); ++cAlloc; } when(app.getReservedContainers()).thenReturn(cReserved); List<RMContainer> cLive = new ArrayList<RMContainer>(); - for (int i = 0; i < used; i += gran) { + Resource usedIter = Resource.newInstance(0, 0); + int i = 0; + for (; Resources.lessThan(rc, mCS.getClusterResource(), usedIter, used); Resources + .addTo(usedIter, gran)) { if(setAMContainer && i == 0){ cLive.add(mockContainer(appAttId, cAlloc, unit, 0)); }else{ cLive.add(mockContainer(appAttId, cAlloc, unit, 1)); } ++cAlloc; + ++i; } when(app.getLiveContainers()).thenReturn(cLive); return app; @@ -752,6 +868,16 @@ public class TestProportionalCapacityPreemptionPolicy { return ret; } + static Resource leafAbsCapacities(Resource[] abs, int[] subqueues) { + Resource ret = Resource.newInstance(0, 0); + for (int i = 0; i < abs.length; ++i) { + if (0 == subqueues[i]) { + Resources.addTo(ret, abs[i]); + } + } + return ret; + } + void printString(CSQueue nq, String indent) { if (nq instanceof ParentQueue) { System.out.println(indent + nq.getQueueName()
