goiri commented on code in PR #5727: URL: https://github.com/apache/hadoop/pull/5727#discussion_r1231220115
########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java: ########## @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; + +public class TokenAndRegisterResponse { Review Comment: Add javadoc. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java: ########## @@ -1251,90 +1252,109 @@ private List<SubClusterId> registerAndAllocateWithNewSubClusters( // Check to see if there are any new sub-clusters in this request // list and create and register Unmanaged AM instance for the new ones List<SubClusterId> newSubClusters = new ArrayList<>(); - for (SubClusterId subClusterId : requests.keySet()) { - if (!subClusterId.equals(this.homeSubClusterId) - && !this.uamPool.hasUAMId(subClusterId.getId())) { - newSubClusters.add(subClusterId); + requests.keySet().stream().forEach(subClusterId -> { + String id = subClusterId.getId(); + if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) { + newSubClusters.add(subClusterId); // Set sub-cluster to be timed out initially - lastSCResponseTime.put(subClusterId, - clock.getTime() - subClusterTimeOut); + lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut); } - } + }); this.uamRegisterFutures.clear(); + for (final SubClusterId scId : newSubClusters) { - Future<?> future = this.threadpool.submit(new Runnable() { - @Override - public void run() { - String subClusterId = scId.getId(); - - // Create a config loaded with federation on and subclusterId - // for each UAM - YarnConfiguration config = new YarnConfiguration(getConf()); - FederationProxyProviderUtil.updateConfForFederation(config, - subClusterId); - - RegisterApplicationMasterResponse uamResponse = null; - Token<AMRMTokenIdentifier> token = null; - try { - ApplicationId applicationId = attemptId.getApplicationId(); - ApplicationSubmissionContext originalSubmissionContext = - federationFacade.getApplicationSubmissionContext(applicationId); - - // For appNameSuffix, use subClusterId of the home sub-cluster - token = uamPool.launchUAM(subClusterId, config, - applicationId, amRegistrationResponse.getQueue(), - getApplicationContext().getUser(), homeSubClusterId.toString(), - true, subClusterId, originalSubmissionContext); - - secondaryRelayers.put(subClusterId, - uamPool.getAMRMClientRelayer(subClusterId)); - - uamResponse = uamPool.registerApplicationMaster(subClusterId, - amRegistrationRequest); - } catch (Throwable e) { - LOG.error("Failed to register application master: " + subClusterId - + " Application: " + attemptId, e); - // TODO: UAM registration for this sub-cluster RM - // failed. For now, we ignore the resource requests and continue - // but we need to fix this and handle this situation. One way would - // be to send the request to another RM by consulting the policy. - return; - } - uamRegistrations.put(scId, uamResponse); - LOG.info("Successfully registered unmanaged application master: " - + subClusterId + " ApplicationId: " + attemptId); - try { - uamPool.allocateAsync(subClusterId, requests.get(scId), - new HeartbeatCallBack(scId, true)); - } catch (Throwable e) { - LOG.error("Failed to allocate async to " + subClusterId - + " Application: " + attemptId, e); - } + Future<?> future = this.threadpool.submit(() -> { - // Save the UAM token in registry or NMSS - try { - if (registryClient != null) { - registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), - subClusterId, token); - } else if (getNMStateStore() != null) { - getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, - NMSS_SECONDARY_SC_PREFIX + subClusterId, - token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); - } - } catch (Throwable e) { - LOG.error("Failed to persist UAM token from " + subClusterId - + " Application: " + attemptId, e); + String subClusterId = scId.getId(); + + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, subClusterId); + ApplicationId applicationId = attemptId.getApplicationId(); + + RegisterApplicationMasterResponse uamResponse; + Token<AMRMTokenIdentifier> token; + + // LaunchUAM And RegisterApplicationMaster + try { + Review Comment: Avoid this break line. ########## hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java: ########## @@ -1432,4 +1432,53 @@ private void finishApplication() throws IOException, YarnException { Assert.assertNotNull(finishResponse); Assert.assertTrue(finishResponse.getIsUnregistered()); } + + @Test + public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception { + + UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + interceptor.setRetryCount(2); + + ugi.doAs((PrivilegedExceptionAction<Object>) () -> { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + + int numberOfContainers = 3; + List<Container> containers = getContainersAndAssert(numberOfContainers, numberOfContainers); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finishResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finishResponse); + Assert.assertEquals(true, finishResponse.getIsUnregistered()); Review Comment: assertTrue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
