Repository: hadoop Updated Branches: refs/heads/trunk 37f4696a9 -> bc93ac229
YARN-7139. FairScheduler: finished applications are always restored to default queue. Contributed by Wilfred Spiegelenburg. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc93ac22 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc93ac22 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc93ac22 Branch: refs/heads/trunk Commit: bc93ac229e17b1be440052217e51820b95c179ec Parents: 37f4696 Author: Miklos Szegedi <szege...@apache.org> Authored: Thu Jan 18 16:03:53 2018 -0800 Committer: Miklos Szegedi <szege...@apache.org> Committed: Thu Jan 18 17:43:47 2018 -0800 ---------------------------------------------------------------------- .../scheduler/fair/FairScheduler.java | 15 ++++++-- .../ParameterizedSchedulerTestBase.java | 8 ++++ .../TestWorkPreservingRMRestart.java | 39 ++++++++++++++++++++ .../scheduler/fair/FairSchedulerTestBase.java | 22 +++++++---- .../scheduler/fair/TestFSAppAttempt.java | 8 ++-- 5 files changed, 77 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc93ac22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b31ab07..e2a62ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -495,15 +495,22 @@ public class FairScheduler extends applications.put(applicationId, application); queue.getMetrics().submitApp(user); - LOG.info("Accepted application " + applicationId + " from user: " + user - + ", in queue: " + queue.getName() - + ", currently num of applications: " + applications.size()); + LOG.info("Accepted application " + applicationId + " from user: " + user + + ", in queue: " + queue.getName() + + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } - } else{ + } else { + // During tests we do not always have an application object, handle + // it here but we probably should fix the tests + if (rmApp != null && rmApp.getApplicationSubmissionContext() != null) { + // Before we send out the event that the app is accepted is + // to set the queue in the submissionContext (needed on restore etc) + rmApp.getApplicationSubmissionContext().setQueue(queue.getName()); + } rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc93ac22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java index 9a29a89..4de16dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ParameterizedSchedulerTestBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.junit.After; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -111,6 +112,13 @@ public abstract class ParameterizedSchedulerTestBase { conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); } + @After + public void tearDown() { + if (schedulerType == SchedulerType.FAIR) { + (new File(FS_ALLOC_FILE)).delete(); + } + } + public SchedulerType getSchedulerType() { return schedulerType; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc93ac22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index efde781..e4c83e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -1688,4 +1688,43 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase // *********** check appSchedulingInfo state *********** assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); } + + // Apps already completed before RM restart. Make sure we restore the queue + // correctly + @Test(timeout = 20000) + public void testFairSchedulerCompletedAppsQueue() throws Exception { + if (getSchedulerType() != SchedulerType.FAIR) { + return; + } + + rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app, rm1, nm1, am1); + + String fsQueueContext = app.getApplicationSubmissionContext().getQueue(); + String fsQueueApp = app.getQueue(); + assertEquals("Queue in app not equal to submission context", fsQueueApp, + fsQueueContext); + RMAppAttempt rmAttempt = app.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found", rmAttempt); + + rm2 = new MockRM(conf, rm1.getRMStateStore()); + rm2.start(); + + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app.getApplicationId()); + RMAppAttempt rmAttemptRecovered = recoveredApp.getCurrentAppAttempt(); + assertNotNull("No AppAttempt found after recovery", rmAttemptRecovered); + String fsQueueContextRecovered = + recoveredApp.getApplicationSubmissionContext().getQueue(); + String fsQueueAppRecovered = recoveredApp.getQueue(); + assertEquals(RMAppState.FINISHED, recoveredApp.getState()); + assertEquals("Recovered app queue is not the same as context queue", + fsQueueAppRecovered, fsQueueContextRecovered); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc93ac22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd..5f29186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -163,16 +163,18 @@ public class FairSchedulerTestBase { protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, + this.ATTEMPT_ID++); scheduler.addApplication(id.getApplicationId(), queueId, userId, false); // This conditional is for testAclSubmitApplication where app is rejected // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { + if (scheduler.getSchedulerApplications(). + containsKey(id.getApplicationId())) { scheduler.addApplicationAttempt(id, false, false); } List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); + ResourceRequest request = createResourceRequest(memory, vcores, + ResourceRequest.ANY, priority, numContainers, true); ask.add(request); RMApp rmApp = mock(RMApp.class); @@ -180,9 +182,11 @@ public class FairSchedulerTestBase { when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id, resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); Container container = mock(Container.class); when(rmAppAttempt.getMasterContainer()).thenReturn(container); resourceManager.getRMContext().getRMApps() @@ -210,9 +214,11 @@ public class FairSchedulerTestBase { when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt); when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn( new RMAppAttemptMetrics(id,resourceManager.getRMContext())); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext); + when(rmApp.getApplicationSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); @@ -275,9 +281,11 @@ public class FairSchedulerTestBase { RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class); when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); when(app.getCurrentAppAttempt()).thenReturn(attempt); - ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); when(submissionContext.getUnmanagedAM()).thenReturn(false); when(attempt.getSubmissionContext()).thenReturn(submissionContext); + when(app.getApplicationSubmissionContext()).thenReturn(submissionContext); resourceManager.getRMContext().getRMApps() .put(attemptId.getApplicationId(), app); return app; http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc93ac22/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 46187d9..51ffd23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -315,11 +315,11 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { List<String> blacklistAdditions = new ArrayList<String>(1); List<String> blacklistRemovals = new ArrayList<String>(1); blacklistAdditions.add(n1.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); - app.getQueue().setFairShare(clusterResource); FSAppAttempt spyApp = spy(app); doReturn(false) .when(spyApp).isWaitingForAMContainer(); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.getQueue().setFairShare(clusterResource); assertTrue(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(n2.getUnallocatedResource(), spyApp.getHeadroom()); @@ -327,7 +327,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { blacklistAdditions.clear(); blacklistAdditions.add(n2.getNodeName()); blacklistRemovals.add(n1.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertTrue(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(n1.getUnallocatedResource(), spyApp.getHeadroom()); @@ -335,7 +335,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase { blacklistAdditions.clear(); blacklistRemovals.clear(); blacklistRemovals.add(n2.getNodeName()); - app.updateBlacklist(blacklistAdditions, blacklistRemovals); + spyApp.updateBlacklist(blacklistAdditions, blacklistRemovals); assertFalse(spyApp.isPlaceBlacklisted(n1.getNodeName())); assertFalse(spyApp.isPlaceBlacklisted(n2.getNodeName())); assertEquals(clusterResource, spyApp.getHeadroom()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org