YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. (Sunil G via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf9d3c92 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf9d3c92 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf9d3c92 Branch: refs/heads/HDFS-7285 Commit: cf9d3c925608e8bc650d43975382ed3014081057 Parents: 8f73bdd Author: Wangda Tan <wan...@apache.org> Authored: Mon Aug 10 14:54:55 2015 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Mon Aug 10 14:54:55 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/CapacityScheduler.java | 18 +---- .../capacity/CapacitySchedulerContext.java | 2 - .../scheduler/capacity/LeafQueue.java | 55 ++++++++----- ...pacityPreemptionPolicyForNodePartitions.java | 10 ++- .../capacity/TestApplicationLimits.java | 14 +--- .../capacity/TestCapacityScheduler.java | 83 +++++++------------- .../scheduler/capacity/TestChildQueueOrder.java | 2 - .../scheduler/capacity/TestLeafQueue.java | 49 ++++++------ .../scheduler/capacity/TestParentQueue.java | 2 - .../scheduler/capacity/TestReservations.java | 2 - 11 files changed, 106 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7d34eeb..5e27a2f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -161,6 +161,9 @@ Release 2.8.0 - UNRELEASED YARN-3948. Display Application Priority in RM Web UI.(Sunil G via rohithsharmaks) + YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. + (Sunil G via wangda) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 1d353a6..b4d0095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -156,17 +156,6 @@ public class CapacityScheduler extends static final PartitionedQueueComparator partitionedQueueComparator = new PartitionedQueueComparator(); - public static final Comparator<FiCaSchedulerApp> applicationComparator = - new Comparator<FiCaSchedulerApp>() { - @Override - public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - if (!a1.getPriority().equals(a2.getPriority())) { - return a1.getPriority().compareTo(a2.getPriority()); - } - return a1.getApplicationId().compareTo(a2.getApplicationId()); - } - }; - @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -275,11 +264,6 @@ public class CapacityScheduler extends } @Override - public Comparator<FiCaSchedulerApp> getApplicationComparator() { - return applicationComparator; - } - - @Override public ResourceCalculator getResourceCalculator() { return calculator; } @@ -1633,7 +1617,7 @@ public class CapacityScheduler extends if (disposableLeafQueue.getNumApplications() > 0) { throw new SchedulerDynamicEditException("The queue " + queueName + " is not empty " + disposableLeafQueue.getApplications().size() - + " active apps " + disposableLeafQueue.pendingApplications.size() + + " active apps " + disposableLeafQueue.getPendingApplications().size() + " pending apps"); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 707c463..2a0dd0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -54,8 +54,6 @@ public interface CapacitySchedulerContext { */ Configuration getConf(); - Comparator<FiCaSchedulerApp> getApplicationComparator(); - ResourceCalculator getResourceCalculator(); Comparator<CSQueue> getNonPartitionedQueueComparator(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 2691c33..5976f58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -93,7 +93,7 @@ public class LeafQueue extends AbstractCSQueue { private Priority defaultAppPriorityPerQueue; - Set<FiCaSchedulerApp> pendingApplications; + private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null; private volatile float minimumAllocationFactor; @@ -117,8 +117,7 @@ public class LeafQueue extends AbstractCSQueue { private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; - private OrderingPolicy<FiCaSchedulerApp> - orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>(); + private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on @@ -136,11 +135,6 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); } - - Comparator<FiCaSchedulerApp> applicationComparator = - cs.getApplicationComparator(); - this.pendingApplications = - new TreeSet<FiCaSchedulerApp>(applicationComparator); setupQueueConfigs(cs.getClusterResource()); } @@ -164,6 +158,8 @@ public class LeafQueue extends AbstractCSQueue { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); + setPendingAppsOrderingPolicy(conf + .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -328,7 +324,7 @@ public class LeafQueue extends AbstractCSQueue { } public synchronized int getNumPendingApplications() { - return pendingApplications.size(); + return pendingOrderingPolicy.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -594,8 +590,8 @@ public class LeafQueue extends AbstractCSQueue { Resource amLimit = getAMResourceLimit(); Resource userAMLimit = getUserAMResourceLimit(); - for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); - i.hasNext(); ) { + for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy() + .getAssignmentIterator(); i.hasNext();) { FiCaSchedulerApp application = i.next(); ApplicationId applicationId = application.getApplicationId(); // Check am resource limit @@ -662,7 +658,7 @@ public class LeafQueue extends AbstractCSQueue { User user) { // Accept user.submitApplication(); - pendingApplications.add(application); + getPendingAppsOrderingPolicy().addSchedulableEntity(application); applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -701,7 +697,7 @@ public class LeafQueue extends AbstractCSQueue { boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - pendingApplications.remove(application); + pendingOrderingPolicy.removeSchedulableEntity(application); } else { queueUsage.decAMUsed(application.getAMResource()); user.getResourceUsage().decAMUsed(application.getAMResource()); @@ -1354,7 +1350,14 @@ public class LeafQueue extends AbstractCSQueue { } getParent().recoverContainer(clusterResource, attempt, rmContainer); } - + + /** + * Obtain (read-only) collection of pending applications. + */ + public Collection<FiCaSchedulerApp> getPendingApplications() { + return pendingOrderingPolicy.getSchedulableEntities(); + } + /** * Obtain (read-only) collection of active applications. */ @@ -1375,7 +1378,8 @@ public class LeafQueue extends AbstractCSQueue { @Override public synchronized void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { - for (FiCaSchedulerApp pendingApp : pendingApplications) { + for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy + .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); } for (FiCaSchedulerApp app : @@ -1450,9 +1454,10 @@ public class LeafQueue extends AbstractCSQueue { public synchronized void setOrderingPolicy( OrderingPolicy<FiCaSchedulerApp> orderingPolicy) { - orderingPolicy.addAllSchedulableEntities( - this.orderingPolicy.getSchedulableEntities() - ); + if (null != this.orderingPolicy) { + orderingPolicy.addAllSchedulableEntities(this.orderingPolicy + .getSchedulableEntities()); + } this.orderingPolicy = orderingPolicy; } @@ -1461,6 +1466,20 @@ public class LeafQueue extends AbstractCSQueue { return defaultAppPriorityPerQueue; } + public synchronized OrderingPolicy<FiCaSchedulerApp> + getPendingAppsOrderingPolicy() { + return pendingOrderingPolicy; + } + public synchronized void setPendingAppsOrderingPolicy( + OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) { + if (null != this.pendingOrderingPolicy) { + pendingOrderingPolicy + .addAllSchedulableEntities(this.pendingOrderingPolicy + .getSchedulableEntities()); + } + this.pendingOrderingPolicy = pendingOrderingPolicy; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index d6f64bf..bbcb625 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1127,8 +1128,13 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { when(parentQueue.getChildQueues()).thenReturn(children); } else { LeafQueue leafQueue = mock(LeafQueue.class); - final TreeSet<FiCaSchedulerApp> apps = - new TreeSet<>(CapacityScheduler.applicationComparator); + final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>( + new Comparator<FiCaSchedulerApp>() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationId().compareTo(a2.getApplicationId()); + } + }); when(leafQueue.getApplications()).thenReturn(apps); OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class); when(so.getPreemptionIterator()).thenAnswer(new Answer() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index fa2a8e3..8c4ffd4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -93,8 +93,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). @@ -255,8 +253,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); @@ -499,7 +495,7 @@ public class TestApplicationLimits { assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - assertTrue(queue.pendingApplications.contains(app_2)); + assertTrue(queue.getPendingApplications().contains(app_2)); // Submit fourth application, should remain pending FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0, @@ -509,7 +505,7 @@ public class TestApplicationLimits { assertEquals(2, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(2, queue.getNumPendingApplications(user_0)); - assertTrue(queue.pendingApplications.contains(app_3)); + assertTrue(queue.getPendingApplications().contains(app_3)); // Kill 3rd pending application queue.finishApplicationAttempt(app_2, A); @@ -517,7 +513,7 @@ public class TestApplicationLimits { assertEquals(1, queue.getNumPendingApplications()); assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(1, queue.getNumPendingApplications(user_0)); - assertFalse(queue.pendingApplications.contains(app_2)); + assertFalse(queue.getPendingApplications().contains(app_2)); assertFalse(queue.getApplications().contains(app_2)); // Finish 1st application, app_3 should become active @@ -527,7 +523,7 @@ public class TestApplicationLimits { assertEquals(2, queue.getNumActiveApplications(user_0)); assertEquals(0, queue.getNumPendingApplications(user_0)); assertTrue(queue.getApplications().contains(app_3)); - assertFalse(queue.pendingApplications.contains(app_3)); + assertFalse(queue.getPendingApplications().contains(app_3)); assertFalse(queue.getApplications().contains(app_0)); // Finish 2nd application @@ -562,8 +558,6 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 6933e41..279299e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -889,61 +889,36 @@ public class TestCapacityScheduler { 0, alloc1Response.getAllocatedContainers().size()); rm.stop(); } - - @Test (timeout = 5000) - public void testApplicationComparator() - { - CapacityScheduler cs = new CapacityScheduler(); - Comparator<FiCaSchedulerApp> appComparator= cs.getApplicationComparator(); - ApplicationId id1 = ApplicationId.newInstance(1, 1); - ApplicationId id2 = ApplicationId.newInstance(1, 2); - ApplicationId id3 = ApplicationId.newInstance(2, 1); - Priority priority = Priority.newInstance(0); - //same clusterId - FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class); - when(app1.getApplicationId()).thenReturn(id1); - when(app1.getPriority()).thenReturn(priority); - FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class); - when(app2.getApplicationId()).thenReturn(id2); - when(app2.getPriority()).thenReturn(priority); - FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class); - when(app3.getApplicationId()).thenReturn(id3); - when(app3.getPriority()).thenReturn(priority); - assertTrue(appComparator.compare(app1, app2) < 0); - //different clusterId - assertTrue(appComparator.compare(app1, app3) < 0); - assertTrue(appComparator.compare(app2, app3) < 0); - } - @Test - public void testGetAppsInQueue() throws Exception { - Application application_0 = new Application("user_0", "a1", resourceManager); - application_0.submit(); - - Application application_1 = new Application("user_0", "a2", resourceManager); - application_1.submit(); - - Application application_2 = new Application("user_0", "b2", resourceManager); - application_2.submit(); - - ResourceScheduler scheduler = resourceManager.getResourceScheduler(); - - List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); - assertEquals(1, appsInA1.size()); - - List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); - assertTrue(appsInA.contains(application_0.getApplicationAttemptId())); - assertTrue(appsInA.contains(application_1.getApplicationAttemptId())); - assertEquals(2, appsInA.size()); - - List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); - assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId())); - assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId())); - assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId())); - assertEquals(3, appsInRoot.size()); - - Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue")); - } + @Test + public void testGetAppsInQueue() throws Exception { + Application application_0 = new Application("user_0", "a1", resourceManager); + application_0.submit(); + + Application application_1 = new Application("user_0", "a2", resourceManager); + application_1.submit(); + + Application application_2 = new Application("user_0", "b2", resourceManager); + application_2.submit(); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInA.contains(application_1.getApplicationAttemptId())); + assertEquals(2, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId())); + assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId())); + assertEquals(3, appsInRoot.size()); + + Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue")); + } @Test public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 295a31a..9dcab2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -94,8 +94,6 @@ public class TestChildQueueOrder { Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f419528..0efadc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -144,8 +144,6 @@ public class TestLeafQueue { thenReturn(Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). @@ -1910,7 +1908,7 @@ public class TestLeafQueue { // before reinitialization assertEquals(2, e.getNumActiveApplications()); - assertEquals(1, e.pendingApplications.size()); + assertEquals(1, e.getNumPendingApplications()); csConf.setDouble(CapacitySchedulerConfiguration .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, @@ -1927,7 +1925,7 @@ public class TestLeafQueue { // after reinitialization assertEquals(3, e.getNumActiveApplications()); - assertEquals(0, e.pendingApplications.size()); + assertEquals(0, e.getNumPendingApplications()); } @Test (timeout = 30000) @@ -1991,7 +1989,7 @@ public class TestLeafQueue { // before updating cluster resource assertEquals(2, e.getNumActiveApplications()); - assertEquals(1, e.pendingApplications.size()); + assertEquals(1, e.getNumPendingApplications()); Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); e.updateClusterResource(clusterResource, @@ -1999,7 +1997,7 @@ public class TestLeafQueue { // after updating cluster resource assertEquals(3, e.getNumActiveApplications()); - assertEquals(0, e.pendingApplications.size()); + assertEquals(0, e.getNumPendingApplications()); } public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) { @@ -2350,8 +2348,9 @@ public class TestLeafQueue { LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); - a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); - + a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); + a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); + String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB); @@ -2367,14 +2366,14 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext)); + mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); a.submitApplicationAttempt(app_1, user_0); Priority priority = TestUtils.createMockPriority(1); @@ -2392,36 +2391,34 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory)); app_1.updateResourceRequests(app_1_requests_0); - - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + + // app_1 will get containers as it has high priority a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); - + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + app_0_requests_0.clear(); app_0_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory)); app_0.updateResourceRequests(app_0_requests_0); - + app_1_requests_0.clear(); app_1_requests_0.add( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory)); app_1.updateResourceRequests(app_1_requests_0); - - //Even thought it already has more resources, app_0 will still get - //assigned first - a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); - Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); - - //and only then will app_1 + + //app_1 will still get assigned first as priority is more. a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); - } - + //and only then will app_2 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); +} @Test public void testConcurrentAccess() throws Exception { YarnConfiguration conf = new YarnConfiguration(); @@ -2500,6 +2497,7 @@ public class TestLeafQueue { new FairOrderingPolicy<FiCaSchedulerApp>(); a.setOrderingPolicy(schedulingOrder); + a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>()); String host_0_0 = "127.0.0.1"; String rack_0 = "rack_0"; @@ -2542,6 +2540,7 @@ public class TestLeafQueue { true, priority, recordFactory)); app_1.updateResourceRequests(app_1_requests_0); + // app_0 will get containers as its submitted first. a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 4deaaae..ef35093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -90,8 +90,6 @@ public class TestParentQueue { Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); - when(csContext.getApplicationComparator()). - thenReturn(CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 66ad3a8..6a0b11b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -112,8 +112,6 @@ public class TestReservations { Resources.createResource(16 * GB, 12)); when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 12)); - when(csContext.getApplicationComparator()).thenReturn( - CapacityScheduler.applicationComparator); when(csContext.getNonPartitionedQueueComparator()).thenReturn( CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);