slfan1989 commented on code in PR #4897:
URL: https://github.com/apache/hadoop/pull/4897#discussion_r976164888
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java:
##########
@@ -1242,4 +1255,181 @@ public void testRemoveAppFromRegistryApplicationFailed()
return null;
});
}
+
+ public void testRecoverWithBadSubCluster(final RegistryOperations
registryObj)
+ throws IOException, InterruptedException {
+
+ UserGroupInformation ugi =
+ interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ // Prepare a list of subclusters
+ List<SubClusterId> subClusterIds = new ArrayList<>();
+ SubClusterId sc1 = SubClusterId.newInstance("SC-1");
+ SubClusterId sc2 = SubClusterId.newInstance("SC-2");
+ SubClusterId homeSC = SubClusterId.newInstance(HOME_SC_ID);
+ subClusterIds.add(sc1);
+ subClusterIds.add(sc2);
+ subClusterIds.add(homeSC);
+
+ // Prepare AMRMProxy Context
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+
+ // Prepare RegisterApplicationMasterRequest
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Step1. Prepare subClusters SC-1, SC-2, HomeSC and Interceptor
+ initSubClusterAndInterceptor(subClusterIds, registryObj);
+
+ // Step2. Register Application And Assign Containers
+ List<Container> containers =
registerApplicationAndAssignContainers(registerReq);
+
+ // Step3. Offline SC-1 cluster
+ offlineSubClusterSC1(sc1);
+
+ // Step4. Recover ApplicationMaster
+ recoverApplicationMaster(appContext);
+
+ // Step5. We recovered ApplicationMaster.
+ // SC-1 was offline, SC-2 was recovered at this time,
UnmanagedAMPool.size=1 and only SC-2
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 1);
+ Assert.assertTrue(allUAMIds.contains(sc2.getId()));
+
+ // Step6. The first allocate call expects a fail-over exception and
re-register.
+ AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(0);
+ LambdaTestUtils.intercept(ApplicationMasterNotRegisteredException.class,
+ "AMRMProxy just restarted and recovered for " + this.attemptId +
+ ". AM should re-register and full re-send pending requests.",
+ () -> interceptor.allocate(allocateRequest));
+ interceptor.registerApplicationMaster(registerReq);
+
+ // Step7. release Containers
+ releaseContainers(containers, sc1);
+
+ // Step8. finish application
+ finishApplication();
+
+ return null;
+ });
+ }
+
+ private void initSubClusterAndInterceptor(List<SubClusterId> subClusterIds,
+ RegistryOperations registryObj) throws YarnException {
+ // Prepare subClusters SC-1, SC-2, HomeSC
+ for (SubClusterId subClusterId : subClusterIds) {
+ registerSubCluster(subClusterId);
+ }
+
+ // Prepare Interceptor
+ interceptor = new TestableFederationInterceptor();
+ AMRMProxyApplicationContext appContext = new
AMRMProxyApplicationContextImpl(nmContext,
+ getConf(), attemptId, "test-user", null, null, null, registryObj);
+ interceptor.init(appContext);
+ interceptor.cleanupRegistry();
+ }
+
+ private List<Container> registerApplicationAndAssignContainers(
+ RegisterApplicationMasterRequest registerReq) throws Exception {
+
+ // Register HomeSC
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+
+ // We only registered HomeSC, so UnmanagedAMPoolSize should be empty
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // We assign 3 Containers to each cluster
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 3);
+
+ // At this point, UnmanagedAMPoolSize should be equal to 2 and should
contain SC-1, SC-2
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+ UnmanagedAMPoolManager unmanagedAMPoolManager =
interceptor.getUnmanagedAMPool();
+ Set<String> allUAMIds = unmanagedAMPoolManager.getAllUAMIds();
+ Assert.assertNotNull(allUAMIds);
+ Assert.assertTrue(allUAMIds.size() == 2);
+ Assert.assertTrue(allUAMIds.contains("SC-1"));
+ Assert.assertTrue(allUAMIds.contains("SC-2"));
+
+ // Make sure all async hb threads are done
+ interceptor.drainAllAsyncQueue(true);
+
+ return containers;
+ }
+
+ private void offlineSubClusterSC1(SubClusterId subClusterId) throws
YarnException {
+
+ ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+ interceptor.getSecondaryRMs();
+
+ // SC-1 out of service
+ deRegisterSubCluster(subClusterId);
+ secondaries.get(subClusterId.getId()).setRunningMode(false);
+ }
+
+ private void recoverApplicationMaster(AMRMProxyApplicationContext appContext)
+ throws IOException {
+ // Prepare for Federation Interceptor restart and recover
+ Map<String, byte[]> recoveredDataMap =
+ recoverDataMapForAppAttempt(nmStateStore, attemptId);
+
+ // Preserve the mock RM instances
+ MockResourceManagerFacade homeRM = interceptor.getHomeRM();
+
+ // Create a new interceptor instance and recover
+ interceptor = new TestableFederationInterceptor(homeRM,
+ interceptor.getSecondaryRMs());
+ interceptor.init(appContext);
+ interceptor.recover(recoveredDataMap);
+ }
+
+ private void releaseContainers(List<Container> containers, SubClusterId
subClusterId)
+ throws Exception {
+
+ ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+ interceptor.getSecondaryRMs();
+ lastResponseId = 0;
+
+ // Get the Container list of SC-1
+ MockResourceManagerFacade sc1Facade = secondaries.get("SC-1");
+ HashMap<ApplicationId, List<ContainerId>> appContainerMap =
+ sc1Facade.getApplicationContainerIdMap();
+ Assert.assertNotNull(appContainerMap);
+ ApplicationId applicationId = attemptId.getApplicationId();
+ Assert.assertNotNull(applicationId);
+ List<ContainerId> sc1ContainerList = appContainerMap.get(applicationId);
+
+ // Release all containers,
+ // Because SC-1 is offline, it is necessary to clean up the Containers
allocated by SC-1
+ containers = containers.stream()
+ .filter(container -> !sc1ContainerList.contains(container.getId()))
+ .collect(Collectors.toList());
+ releaseContainersAndAssert(containers);
+ }
+
+ private void finishApplication() throws IOException, YarnException {
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResponse);
+ Assert.assertEquals(true, finishResponse.getIsUnregistered());
Review Comment:
I will fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]