Repository: hadoop Updated Branches: refs/heads/branch-2.7 1903665b2 -> fd112535c
YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd112535 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd112535 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd112535 Branch: refs/heads/branch-2.7 Commit: fd112535c81bc08d305503284aaa6c89f30518dd Parents: 1903665 Author: Sunil G <[email protected]> Authored: Thu Jun 1 11:19:37 2017 +0530 Committer: Sunil G <[email protected]> Committed: Thu Jun 1 11:19:37 2017 +0530 ---------------------------------------------------------------------- .../server/resourcemanager/AdminService.java | 103 ++++++++++++------- .../scheduler/fair/TestFairScheduler.java | 70 ++++++++++++- 2 files changed, 134 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd112535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 5248adb..306bd86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -302,15 +302,7 @@ public class AdminService extends CompositeService implements UserGroupInformation user = checkAccess("transitionToActive"); checkHaStateChange(reqInfo); - try { - rm.transitionToActive(); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - "", "RMHAProtocolService", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } + try { // call all refresh*s for active RM to get the updated configurations. refreshAll(); @@ -320,10 +312,22 @@ public class AdminService extends CompositeService implements .getDispatcher() .getEventHandler() .handle( - new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e)); + new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, + e)); + throw new ServiceFailedException( + "Error on refreshAll during transition to Active", e); + } + + try { + rm.transitionToActive(); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + "", "RM", + "Exception transitioning to active"); throw new ServiceFailedException( - "Error on refreshAll during transistion to Active", e); + "Error when transitioning to Active mode", e); } + RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RMHAProtocolService"); } @@ -378,12 +382,7 @@ public class AdminService extends CompositeService implements RefreshQueuesResponse response = recordFactory.newRecordInstance(RefreshQueuesResponse.class); try { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); - // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); - if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); - } + refreshQueues(); RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); return response; @@ -392,6 +391,15 @@ public class AdminService extends CompositeService implements } } + private void refreshQueues() throws IOException, YarnException { + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + // refresh the reservation system + ReservationSystem rSystem = rmContext.getReservationSystem(); + if (rSystem != null) { + rSystem.reinitialize(getConfig(), rmContext); + } + } + @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException, StandbyException { @@ -414,6 +422,13 @@ public class AdminService extends CompositeService implements } } + private void refreshNodes() throws IOException, YarnException { + Configuration conf = + getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + rmContext.getNodesListManager().refreshNodes(conf); + } + @Override public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) @@ -423,6 +438,16 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), argName, "refresh super-user-groups."); + refreshSuperUserGroupsConfiguration(); + RMAuditLogger.logSuccess(user.getShortUserName(), + argName, "AdminService"); + + return recordFactory.newRecordInstance( + RefreshSuperUserGroupsConfigurationResponse.class); + } + + private void refreshSuperUserGroupsConfiguration() + throws IOException, YarnException { // Accept hadoop common configs in core-site.xml as well as RM specific // configurations in yarn-site.xml Configuration conf = @@ -431,11 +456,6 @@ public class AdminService extends CompositeService implements YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); RMServerUtils.processRMProxyUsersConf(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf); - RMAuditLogger.logSuccess(user.getShortUserName(), - argName, "AdminService"); - - return recordFactory.newRecordInstance( - RefreshSuperUserGroupsConfigurationResponse.class); } @Override @@ -447,16 +467,20 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), argName, "refresh user-groups."); - Groups.getUserToGroupsMappingService( - getConfiguration(new Configuration(false), - YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); - - RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); + refreshUserToGroupsMappings(); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, + "AdminService"); return recordFactory.newRecordInstance( RefreshUserToGroupsMappingsResponse.class); } + private void refreshUserToGroupsMappings() throws IOException, YarnException { + Groups.getUserToGroupsMappingService( + getConfiguration(new Configuration(false), + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh(); + } + @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { @@ -499,6 +523,14 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), argName, "refresh Service ACLs."); + refreshServiceAcls(); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, + "AdminService"); + + return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); + } + + private void refreshServiceAcls() throws IOException, YarnException { PolicyProvider policyProvider = RMPolicyProvider.getInstance(); Configuration conf = getConfiguration(new Configuration(false), @@ -510,10 +542,6 @@ public class AdminService extends CompositeService implements conf, policyProvider); rmContext.getResourceTrackerService().refreshServiceAcls( conf, policyProvider); - - RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService"); - - return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } private synchronized void refreshServiceAcls(Configuration configuration, @@ -597,16 +625,15 @@ public class AdminService extends CompositeService implements private void refreshAll() throws ServiceFailedException { try { - refreshQueues(RefreshQueuesRequest.newInstance()); - refreshNodes(RefreshNodesRequest.newInstance()); - refreshSuperUserGroupsConfiguration( - RefreshSuperUserGroupsConfigurationRequest.newInstance()); - refreshUserToGroupsMappings( - RefreshUserToGroupsMappingsRequest.newInstance()); + checkAcls("refreshAll"); + refreshQueues(); + refreshNodes(); + refreshSuperUserGroupsConfiguration(); + refreshUserToGroupsMappings(); if (getConfig().getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - refreshServiceAcls(RefreshServiceAclsRequest.newInstance()); + refreshServiceAcls(); } } catch (Exception ex) { throw new ServiceFailedException(ex.getMessage()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd112535/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 0989a8d..7b3c17f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -52,6 +52,7 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; @@ -73,9 +74,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -83,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -95,7 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -4524,4 +4529,67 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals("Incorrect number of perf metrics", 1, collector.getRecords().size()); } + + @Test(timeout = 120000) + public void testRefreshQueuesWhenRMHA() throws Exception { + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + HAServiceProtocol.StateChangeRequestInfo requestInfo = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues + MockRM rm1 = new MockRM(conf, null); + rm1.init(conf); + rm1.start(); + rm1.getAdminService().transitionToStandby(requestInfo); + + // 2. add a new queue "test_queue" + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"test_queue\">"); + out.println(" <maxRunningApps>3</maxRunningApps>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // 3. start a active RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.init(conf); + rm2.start(); + + MockNM nm = + new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm.registerNode(); + + rm2.getAdminService().transitionToActive(requestInfo); + + // 4. submit a app to the new added queue "test_queue" + RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue"); + RMAppAttempt attempt0 = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId()); + am0.registerAppAttempt(); + assertEquals("root.test_queue", app.getQueue()); + + // 5. Transit rm1 to active, recover app + ((RMContextImpl) rm1.getRMContext()).setStateStore(memStore); + rm1.getAdminService().transitionToActive(requestInfo); + rm1.drainEvents(); + assertEquals(1, rm1.getRMContext().getRMApps().size()); + RMApp recoveredApp = + rm1.getRMContext().getRMApps().values().iterator().next(); + assertEquals("root.test_queue", recoveredApp.getQueue()); + + rm1.stop(); + rm2.stop(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
