Repository: hadoop Updated Branches: refs/heads/trunk fd8065a76 -> 109e528ef
YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/109e528e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/109e528e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/109e528e Branch: refs/heads/trunk Commit: 109e528ef5d8df07443373751266b4417acc981a Parents: fd8065a Author: Jian He <[email protected]> Authored: Fri Jan 8 15:51:10 2016 -0800 Committer: Jian He <[email protected]> Committed: Fri Jan 8 15:51:10 2016 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 2 + .../scheduler/SchedulerApplicationAttempt.java | 9 + .../scheduler/capacity/CapacityScheduler.java | 2 +- .../scheduler/capacity/LeafQueue.java | 63 ++++++- .../scheduler/common/fica/FiCaSchedulerApp.java | 5 +- .../capacity/TestApplicationPriority.java | 164 +++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 6 +- 8 files changed, 241 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 00d31d8..b896b06 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1229,6 +1229,9 @@ Release 2.8.0 - UNRELEASED YARN-4546. ResourceManager crash due to scheduling opportunity overflow. (Jason Lowe via junping_du) + YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. + (Rohith Sharma K S via jianhe) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 2d0d5d6..c79a35e 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -205,6 +205,8 @@ <Field name="userLimitFactor" /> <Field name="maxAMResourcePerQueuePercent" /> <Field name="lastClusterResource" /> + <Field name="pendingOrderingPolicy" /> + <Field name="pendingOPForRecoveredApps" /> </Or> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c1f1c3d..b43c106 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private LogAggregationContext logAggregationContext; private volatile Priority appPriority = null; + private boolean isAttemptRecovering; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); @@ -967,6 +968,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { // queue's resource usage for specific partition } + public boolean isAttemptRecovering() { + return isAttemptRecovering; + } + + protected void setAttemptRecovering(boolean isRecovering) { + this.isAttemptRecovering = isRecovering; + } + public static enum AMState { UNMANAGED("User launched the Application Master, since it's unmanaged. "), INACTIVATED("Application is added to the scheduler and is not yet activated. "), http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c7b73fb..b3b9713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -783,7 +783,7 @@ public class CapacityScheduler extends FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority()); + application.getPriority(), isAttemptRecovering); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c3f4b9..ff7d04f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue { private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null; + // Always give preference to this while activating the application attempts. + private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null; + private volatile float minimumAllocationFactor; private Map<String, User> users = new HashMap<String, User>(); @@ -156,6 +159,8 @@ public class LeafQueue extends AbstractCSQueue { setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); setPendingAppsOrderingPolicy(conf .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath())); + setPendingAppsOrderingPolicyRecovery(conf + .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -320,7 +325,8 @@ public class LeafQueue extends AbstractCSQueue { } public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + return pendingOrderingPolicy.getNumSchedulableEntities() + + pendingOPForRecoveredApps.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -599,9 +605,19 @@ public class LeafQueue extends AbstractCSQueue { Map<String, Resource> userAmPartitionLimit = new HashMap<String, Resource>(); - for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy() - .getAssignmentIterator(); i.hasNext();) { - FiCaSchedulerApp application = i.next(); + activateApplications(getPendingAppsOrderingPolicyRecovery() + .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit); + + activateApplications( + getPendingAppsOrderingPolicy().getAssignmentIterator(), + amPartitionLimit, userAmPartitionLimit); + } + + private synchronized void activateApplications( + Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit, + Map<String, Resource> userAmPartitionLimit) { + while (fsApp.hasNext()) { + FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); // Get the am-node-partition associated with each application @@ -692,7 +708,7 @@ public class LeafQueue extends AbstractCSQueue { metrics.incAMUsed(application.getUser(), application.getAMResource(partitionName)); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - i.remove(); + fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application.getUser() + " activated in queue: " + getQueueName()); } @@ -702,7 +718,11 @@ public class LeafQueue extends AbstractCSQueue { User user) { // Accept user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); + if (application.isAttemptRecovering()) { + getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application); + } else { + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + } applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -742,7 +762,11 @@ public class LeafQueue extends AbstractCSQueue { boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); + if (application.isAttemptRecovering()) { + pendingOPForRecoveredApps.removeSchedulableEntity(application); + } else { + pendingOrderingPolicy.removeSchedulableEntity(application); + } } else { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); @@ -1491,7 +1515,11 @@ public class LeafQueue extends AbstractCSQueue { * Obtain (read-only) collection of pending applications. */ public Collection<FiCaSchedulerApp> getPendingApplications() { - return pendingOrderingPolicy.getSchedulableEntities(); + Collection<FiCaSchedulerApp> pendingApps = + new ArrayList<FiCaSchedulerApp>(); + pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities()); + pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities()); + return pendingApps; } /** @@ -1535,6 +1563,10 @@ public class LeafQueue extends AbstractCSQueue { @Override public synchronized void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { + for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps + .getSchedulableEntities()) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); @@ -1670,6 +1702,21 @@ public class LeafQueue extends AbstractCSQueue { this.pendingOrderingPolicy = pendingOrderingPolicy; } + public synchronized OrderingPolicy<FiCaSchedulerApp> + getPendingAppsOrderingPolicyRecovery() { + return pendingOPForRecoveredApps; + } + + public synchronized void setPendingAppsOrderingPolicyRecovery( + OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) { + if (null != this.pendingOPForRecoveredApps) { + pendingOrderingPolicyRecovery + .addAllSchedulableEntities(this.pendingOPForRecoveredApps + .getSchedulableEntities()); + } + this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c9c792e..4b88415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -99,12 +99,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, - Priority.newInstance(0)); + Priority.newInstance(0), false); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, Priority appPriority) { + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -129,6 +129,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { setAppAMNodePartitionName(partition); setAMResource(partition, amResource); setPriority(appPriority); + setAttemptRecovering(isAttemptRecovering); scheduler = rmContext.getScheduler(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 169e9f6..2ad805a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -567,4 +569,166 @@ public class TestApplicationPriority { Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); rm.stop(); } + + /** + * <p> + * Test case verifies the order of applications activated after RM Restart. + * </p> + * <li>App-1 and app-2 submitted and scheduled and running with a priority + * 5 and 6 Respectively</li> + * <li>App-3 submitted and scheduled with a priority 7. This + * is not activated since AMResourceLimit is reached</li> + * <li>RM restarted</li> + * <li>App-1 get activated nevertheless of AMResourceLimit</li> + * <li>App-2 and app-3 put in pendingOrderingPolicy</li> + * <li>After NM registration, app-3 is activated</li> + * <p> + * Expected Output : App-2 must get activated since app-2 was running earlier + * </p> + * @throws Exception + */ + @Test + public void testOrderOfActivatingThePriorityApplicationOnRMRestart() + throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + final DrainDispatcher dispatcher = new DrainDispatcher(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); + nm1.registerNode(); + + dispatcher.await(); + + ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); + LeafQueue defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + int memory = defaultQueue.getAMResourceLimit().getMemory() / 2; + + // App-1 with priority 5 submitted and running + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(memory, appPriority1); + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + // App-2 with priority 6 submitted and running + Priority appPriority2 = Priority.newInstance(6); + RMApp app2 = rm1.submitApp(memory, appPriority2); + MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + am2.registerAppAttempt(); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); + + // App-3 with priority 7 submitted and scheduled. But not activated since + // AMResourceLimit threshold + Priority appPriority3 = Priority.newInstance(7); + RMApp app3 = rm1.submitApp(memory, appPriority3); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); + + Iterator<FiCaSchedulerApp> iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + FiCaSchedulerApp fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + FiCaSchedulerApp fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + iterator = defaultQueue.getPendingApplications().iterator(); + FiCaSchedulerApp fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + + dispatcher1.await(); + scheduler = rm2.getRMContext().getScheduler(); + defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + + // wait for all applications to get added to scheduler + int count = 5; + while (count-- > 0) { + if ((defaultQueue.getNumActiveApplications() + defaultQueue + .getNumPendingApplications()) == 3) { + break; + } + Thread.sleep(500); + } + + // Before NM registration, AMResourceLimit threshold is 0. So 1st + // applications get activated nevertheless of AMResourceLimit threshold + // Two applications are in pending + Assert.assertEquals(1, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(2, defaultQueue.getNumPendingApplications()); + + // NM resync to new RM + nm1.registerNode(); + dispatcher1.await(); + + // wait for activating one applications + count = 5; + while (count-- > 0) { + if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) { + break; + } + Thread.sleep(500); + } + + // verify for order of activated applications iterator + iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + // verify for pending application iterator. It should be app-3 attempt + iterator = defaultQueue.getPendingApplications().iterator(); + fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + rm2.stop(); + rm1.stop(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/109e528e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 479e25a..d4b8dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -2413,14 +2413,16 @@ public class TestLeafQueue { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(3), false)); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(5), false)); a.submitApplicationAttempt(app_1, user_0); Priority priority = TestUtils.createMockPriority(1);
