http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index d63b2cf..ebd85bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -34,12 +34,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -60,15 +61,19 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +87,7 @@ import com.google.common.base.Preconditions; * pipeline is a chain of interceptor instances that can inspect and modify the * request/response as needed. */ -public class AMRMProxyService extends AbstractService implements +public class AMRMProxyService extends CompositeService implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); @@ -96,6 +101,7 @@ public class AMRMProxyService extends AbstractService implements private InetSocketAddress listenerEndpoint; private AMRMProxyTokenSecretManager secretManager; private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap; + private RegistryOperations registry; /** * Creates an instance of the service. @@ -118,10 +124,23 @@ public class AMRMProxyService extends AbstractService implements @Override protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); this.secretManager = new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); this.secretManager.init(conf); + + // Both second app attempt and NM restart within Federation need registry + if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED) + || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) { + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + addService(this.registry); + } + + super.serviceInit(conf); } @Override @@ -203,6 +222,8 @@ public class AMRMProxyService extends AbstractService implements amrmToken = new Token<>(); amrmToken.decodeFromUrlString( new String(contextEntry.getValue(), "UTF-8")); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); } } @@ -214,12 +235,36 @@ public class AMRMProxyService extends AbstractService implements throw new IOException("No user found for app attempt " + attemptId); } + // Regenerate the local AMRMToken for the AM Token<AMRMTokenIdentifier> localToken = this.secretManager.createAndGetAMRMToken(attemptId); + // Retrieve the AM container credentials from NM context + Credentials amCred = null; + for (Container container : this.nmContext.getContainers().values()) { + LOG.debug("From NM Context container " + container.getContainerId()); + if (container.getContainerId().getApplicationAttemptId().equals( + attemptId) && container.getContainerTokenIdentifier() != null) { + LOG.debug("Container type " + + container.getContainerTokenIdentifier().getContainerType()); + if (container.getContainerTokenIdentifier() + .getContainerType() == ContainerType.APPLICATION_MASTER) { + LOG.info("AM container {} found in context, has credentials: {}", + container.getContainerId(), + (container.getCredentials() != null)); + amCred = container.getCredentials(); + } + } + } + if (amCred == null) { + LOG.error("No credentials found for AM container of {}. " + + "Yarn registry access might not work", attemptId); + } + + // Create the intercepter pipeline for the AM initializePipeline(attemptId, user, amrmToken, localToken, - entry.getValue(), true); - } catch (Exception e) { + entry.getValue(), true, amCred); + } catch (IOException e) { LOG.error("Exception when recovering " + attemptId + ", removing it from NMStateStore and move on", e); this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); @@ -326,7 +371,7 @@ public class AMRMProxyService extends AbstractService implements initializePipeline(appAttemptId, containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, - localToken, null, false); + localToken, null, false, credentials); } /** @@ -342,7 +387,8 @@ public class AMRMProxyService extends AbstractService implements protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token<AMRMTokenIdentifier> amrmToken, Token<AMRMTokenIdentifier> localToken, - Map<String, byte[]> recoveredDataMap, boolean isRecovery) { + Map<String, byte[]> recoveredDataMap, boolean isRecovery, + Credentials credentials) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -404,8 +450,9 @@ public class AMRMProxyService extends AbstractService implements try { RequestInterceptor interceptorChain = this.createRequestInterceptorChain(); - interceptorChain.init(createApplicationMasterContext(this.nmContext, - applicationAttemptId, user, amrmToken, localToken)); + interceptorChain.init( + createApplicationMasterContext(this.nmContext, applicationAttemptId, + user, amrmToken, localToken, credentials, this.registry)); if (isRecovery) { if (recoveredDataMap == null) { throw new YarnRuntimeException( @@ -497,14 +544,12 @@ public class AMRMProxyService extends AbstractService implements allocateResponse.setAMRMToken(null); org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken = - new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>( - token.getIdentifier().array(), token.getPassword().array(), - new Text(token.getKind()), new Text(token.getService())); - - context.setAMRMToken(newToken); + ConverterUtils.convertFromYarn(token, (Text) null); - // Update the AMRMToken in context map in NM state store - if (this.nmContext.getNMStateStore() != null) { + // Update the AMRMToken in context map, and in NM state store if it is + // different + if (context.setAMRMToken(newToken) + && this.nmContext.getNMStateStore() != null) { try { this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, @@ -547,10 +592,12 @@ public class AMRMProxyService extends AbstractService implements private AMRMProxyApplicationContext createApplicationMasterContext( Context context, ApplicationAttemptId applicationAttemptId, String user, Token<AMRMTokenIdentifier> amrmToken, - Token<AMRMTokenIdentifier> localToken) { + Token<AMRMTokenIdentifier> localToken, Credentials credentials, + RegistryOperations registryImpl) { AMRMProxyApplicationContextImpl appContext = new AMRMProxyApplicationContextImpl(context, getConfig(), - applicationAttemptId, user, amrmToken, localToken); + applicationAttemptId, user, amrmToken, localToken, credentials, + registryImpl); return appContext; }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 33cfca3..ef5e061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -34,6 +34,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +151,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private UserGroupInformation appOwner; + private FederationRegistryClient registryClient; + /** * Creates an instance of the FederationInterceptor class. */ @@ -179,6 +187,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } catch (Exception ex) { throw new YarnRuntimeException(ex); } + // Add all app tokens for Yarn Registry access + if (this.registryClient != null && appContext.getCredentials() != null) { + this.appOwner.addCredentials(appContext.getCredentials()); + } this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); @@ -192,6 +204,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.init(conf); this.uamPool.start(); + + if (appContext.getRegistryClient() != null) { + this.registryClient = new FederationRegistryClient(conf, + appContext.getRegistryClient(), this.appOwner); + } } /** @@ -250,20 +267,27 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request); + if (this.amRegistrationResponse + .getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + this.amRegistrationResponse.getContainersFromPreviousAttempts(), + this.homeSubClusterId); + } + + ApplicationId appId = + getApplicationContext().getApplicationAttemptId().getApplicationId(); + reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId); // the queue this application belongs will be used for getting // AMRMProxy policy from state store. String queue = this.amRegistrationResponse.getQueue(); if (queue == null) { - LOG.warn("Received null queue for application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " from home sub-cluster. Will use default queue name " + LOG.warn("Received null queue for application " + appId + + " from home subcluster. Will use default queue name " + YarnConfiguration.DEFAULT_QUEUE_NAME + " for getting AMRMProxyPolicy"); } else { - LOG.info("Application " - + getApplicationContext().getApplicationAttemptId().getApplicationId() - + " belongs to queue " + queue); + LOG.info("Application " + appId + " belongs to queue " + queue); } // Initialize the AMRMProxyPolicy @@ -304,7 +328,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + getApplicationContext().getApplicationAttemptId().getApplicationId()); // Notify policy of home response try { @@ -393,8 +417,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId()); + this.amRegistrationRequest, getApplicationContext() + .getApplicationAttemptId().getApplicationId()); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -425,6 +449,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (failedToUnRegister) { homeResponse.setIsUnregistered(false); + } else { + // Clean up UAMs only when the app finishes successfully, so that no more + // attempt will be launched. + this.uamPool.stop(); + if (this.registryClient != null) { + this.registryClient.removeAppFromRegistry(getApplicationContext() + .getApplicationAttemptId().getApplicationId()); + } } return homeResponse; } @@ -442,9 +474,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ @Override public void shutdown() { - if (this.uamPool != null) { - this.uamPool.stop(); - } + // Do not stop uamPool service and kill UAMs here because of possible second + // app attempt if (threadpool != null) { try { threadpool.shutdown(); @@ -456,6 +487,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** + * Only for unit test cleanup. + */ + @VisibleForTesting + protected void cleanupRegistry() { + if (this.registryClient != null) { + this.registryClient.cleanAllApplications(); + } + } + + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. * @@ -486,6 +527,120 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } + private void mergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + RegisterApplicationMasterResponse otherResponse) { + + if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) { + homeResponse.getContainersFromPreviousAttempts() + .addAll(otherResponse.getContainersFromPreviousAttempts()); + } else { + homeResponse.setContainersFromPreviousAttempts( + otherResponse.getContainersFromPreviousAttempts()); + } + } + + if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) { + if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) { + homeResponse.getNMTokensFromPreviousAttempts() + .addAll(otherResponse.getNMTokensFromPreviousAttempts()); + } else { + homeResponse.setNMTokensFromPreviousAttempts( + otherResponse.getNMTokensFromPreviousAttempts()); + } + } + } + + /** + * Try re-attach to all existing and running UAMs in secondary sub-clusters + * launched by previous application attempts if any. All running containers in + * the UAMs will be combined into the registerResponse. For the first attempt, + * the registry will be empty for this application and thus no-op here. + */ + protected void reAttachUAMAndMergeRegisterResponse( + RegisterApplicationMasterResponse homeResponse, + final ApplicationId appId) { + + if (this.registryClient == null) { + // Both AMRMProxy HA and NM work preserving restart is not enabled + LOG.warn("registryClient is null, skip attaching existing UAM if any"); + return; + } + + // Load existing running UAMs from the previous attempts from + // registry, if any + Map<String, Token<AMRMTokenIdentifier>> uamMap = + this.registryClient.loadStateFromRegistry(appId); + if (uamMap.size() == 0) { + LOG.info("No existing UAM for application {} found in Yarn Registry", + appId); + return; + } + LOG.info("Found {} existing UAMs for application {} in Yarn Registry. " + + "Reattaching in parallel", uamMap.size(), appId); + + ExecutorCompletionService<RegisterApplicationMasterResponse> + completionService = new ExecutorCompletionService<>(threadpool); + + for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) { + final SubClusterId subClusterId = + SubClusterId.newInstance(entry.getKey()); + final Token<AMRMTokenIdentifier> amrmToken = entry.getValue(); + + completionService + .submit(new Callable<RegisterApplicationMasterResponse>() { + @Override + public RegisterApplicationMasterResponse call() throws Exception { + RegisterApplicationMasterResponse response = null; + try { + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId.getId()); + + uamPool.reAttachUAM(subClusterId.getId(), config, appId, + amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.getId(), + amrmToken); + + response = uamPool.registerApplicationMaster( + subClusterId.getId(), amRegistrationRequest); + + if (response != null + && response.getContainersFromPreviousAttempts() != null) { + cacheAllocatedContainers( + response.getContainersFromPreviousAttempts(), + subClusterId); + } + LOG.info("UAM {} reattached for {}", subClusterId, appId); + } catch (Throwable e) { + LOG.error( + "Reattaching UAM " + subClusterId + " failed for " + appId, + e); + } + return response; + } + }); + } + + // Wait for the re-attach responses + for (int i = 0; i < uamMap.size(); i++) { + try { + Future<RegisterApplicationMasterResponse> future = + completionService.take(); + RegisterApplicationMasterResponse registerResponse = future.get(); + if (registerResponse != null) { + LOG.info("Merging register response for {}", appId); + mergeRegisterResponse(homeResponse, registerResponse); + } + } catch (Exception e) { + LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e); + } + } + } + private SubClusterId getSubClusterForNode(String nodeName) { SubClusterId subClusterId = null; try { @@ -655,6 +810,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor { responses.add(response); } + // Save the new AMRMToken for the UAM in registry if present + if (response.getAMRMToken() != null) { + Token<AMRMTokenIdentifier> newToken = ConverterUtils + .convertFromYarn(response.getAMRMToken(), (Text) null); + // Update the token in registry + if (registryClient != null) { + registryClient + .writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + subClusterId.getId(), newToken); + } + } + // Notify policy of secondary sub-cluster responses try { policyInterpreter.notifyOfResponse(subClusterId, response); @@ -714,20 +883,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor { subClusterId); RegisterApplicationMasterResponse uamResponse = null; + Token<AMRMTokenIdentifier> token = null; try { // For appNameSuffix, use subClusterId of the home sub-cluster - uamResponse = uamPool.createAndRegisterNewUAM(subClusterId, - registerRequest, config, + token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString()); + homeSubClusterId.toString(), registryClient != null); + + uamResponse = uamPool.registerApplicationMaster(subClusterId, + registerRequest); } catch (Throwable e) { LOG.error("Failed to register application master: " + subClusterId + " Application: " + appContext.getApplicationAttemptId(), e); } return new RegisterApplicationMasterResponseInfo(uamResponse, - SubClusterId.newInstance(subClusterId)); + SubClusterId.newInstance(subClusterId), token); } }); } @@ -752,6 +924,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + getApplicationContext().getApplicationAttemptId()); successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse()); + + if (registryClient != null) { + registryClient.writeAMRMTokenForUAM( + getApplicationContext().getApplicationAttemptId() + .getApplicationId(), + uamResponse.getSubClusterId().getId(), + uamResponse.getUamToken()); + } } } catch (Exception e) { LOG.warn("Failed to register unmanaged application master: " @@ -1087,11 +1267,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { private static class RegisterApplicationMasterResponseInfo { private RegisterApplicationMasterResponse response; private SubClusterId subClusterId; + private Token<AMRMTokenIdentifier> uamToken; RegisterApplicationMasterResponseInfo( - RegisterApplicationMasterResponse response, SubClusterId subClusterId) { + RegisterApplicationMasterResponse response, SubClusterId subClusterId, + Token<AMRMTokenIdentifier> uamToken) { this.response = response; this.subClusterId = subClusterId; + this.uamToken = uamToken; } public RegisterApplicationMasterResponse getResponse() { @@ -1101,6 +1284,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public SubClusterId getSubClusterId() { return subClusterId; } + + public Token<AMRMTokenIdentifier> getUamToken() { + return uamToken; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 831ba0b..44bfc68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -353,10 +353,6 @@ public class ContainerManagerImpl extends CompositeService implements rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); - if (this.amrmProxyEnabled) { - this.getAMRMProxyService().recover(); - } - RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { @@ -373,6 +369,11 @@ public class ContainerManagerImpl extends CompositeService implements recoverContainer(rcs); } + // Recovery AMRMProxy state after apps and containers are recovered + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + //Dispatching the RECOVERY_COMPLETED event through the dispatcher //so that all the paused, scheduled and queued containers will //be scheduled for execution on availability of resources. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 3c57496..da1d047 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -56,10 +57,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; -import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.junit.After; @@ -179,6 +180,15 @@ public abstract class BaseAMRMProxyTest { return new NMContext(null, null, null, null, stateStore, false, this.conf); } + protected List<ContainerId> getCompletedContainerIds( + List<ContainerStatus> containerStatus) { + List<ContainerId> ret = new ArrayList<>(); + for (ContainerStatus status : containerStatus) { + ret.add(status.getContainerId()); + } + return ret; + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -623,7 +633,7 @@ public abstract class BaseAMRMProxyTest { */ public void initApp(ApplicationAttemptId applicationId, String user) { super.initializePipeline(applicationId, user, - new Token<AMRMTokenIdentifier>(), null, null, false); + new Token<AMRMTokenIdentifier>(), null, null, false, null); } public void stopApp(ApplicationId applicationId) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index 937ede5..b955311 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -444,7 +444,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); getAMRMProxyService().initializePipeline(applicationAttemptId, user, - new Token<AMRMTokenIdentifier>(), null, null, false); + new Token<AMRMTokenIdentifier>(), null, null, false, null); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -531,16 +531,14 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - // The way the mock resource manager is setup, it will return the containers - // that were released in the response. This is done because the UAMs run - // asynchronously and we need to if all the resource managers received the - // release it. The containers sent by the mock resource managers will be + // We need to make sure all the resource managers received the + // release list. The containers sent by the mock resource managers will be // aggregated and returned back to us and we can assert if all the release // lists reached the sub-clusters - List<Container> containersForReleasedContainerIds = - new ArrayList<Container>(); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + List<ContainerId> containersForReleasedContainerIds = new ArrayList<>(); + List<ContainerId> newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -554,8 +552,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { "new AMRMToken from RM should have been nulled by AMRMProxyService", allocateResponse.getAMRMToken()); - containersForReleasedContainerIds.addAll(allocateResponse - .getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " + Integer.toString(allocateResponse.getAllocatedContainers() http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 3db0e35..aa7ed69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -19,16 +19,20 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -59,6 +63,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -79,7 +87,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private TestableFederationInterceptor interceptor; private MemoryFederationStateStore stateStore; + private NMStateStoreService nmStateStore; + private RegistryOperations registry; + private Context nmContext; private int testAppId; private ApplicationAttemptId attemptId; @@ -93,15 +104,28 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf()); + nmStateStore = new NMMemoryStateStoreService(); + nmStateStore.init(getConf()); + nmStateStore.start(); + + registry = new FSRegistryOperationsService(); + registry.init(getConf()); + registry.start(); + testAppId = 1; attemptId = getApplicationAttemptId(testAppId); - interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(), - attemptId, "test-user", null, null)); + nmContext = + new NMContext(null, null, null, null, nmStateStore, false, getConf()); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), + attemptId, "test-user", null, null, null, registry)); + interceptor.cleanupRegistry(); } @Override public void tearDown() { + interceptor.cleanupRegistry(); interceptor.shutdown(); + registry.stop(); super.tearDown(); } @@ -207,18 +231,17 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); - // The way the mock resource manager is setup, it will return the containers - // that were released in the allocated containers. The release request will - // be split and handled by the corresponding UAM. The release containers - // returned by the mock resource managers will be aggregated and returned - // back to us and we can check if total request size and returned size are - // the same - List<Container> containersForReleasedContainerIds = - new ArrayList<Container>(); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + // The release request will be split and handled by the corresponding UAM. + // The release containers returned by the mock resource managers will be + // aggregated and returned back to us and we can check if total request size + // and returned size are the same + List<ContainerId> containersForReleasedContainerIds = + new ArrayList<ContainerId>(); + List<ContainerId> newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in the original request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); // Send max 10 heart beats to receive all the containers. If not, we will // fail the test @@ -228,11 +251,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateResponse = interceptor.allocate(Records.newRecord(AllocateRequest.class)); Assert.assertNotNull(allocateResponse); - containersForReleasedContainerIds - .addAll(allocateResponse.getAllocatedContainers()); + newlyFinished = getCompletedContainerIds( + allocateResponse.getCompletedContainersStatuses()); + containersForReleasedContainerIds.addAll(newlyFinished); LOG.info("Number of containers received in this request: " - + Integer.toString(allocateResponse.getAllocatedContainers().size())); + + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " + Integer.toString(containersForReleasedContainerIds.size())); Thread.sleep(10); @@ -547,4 +571,74 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { Assert.assertEquals(1, response.getUpdatedContainers().size()); Assert.assertEquals(1, response.getUpdateErrors().size()); } + + @Test + public void testSecondAttempt() throws Exception { + ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); + userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate one batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + for (Container c : containers) { + System.out.println(c.getId() + " ha"); + } + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Preserve the mock RM instances for secondaries + ConcurrentHashMap<String, MockResourceManagerFacade> secondaries = + interceptor.getSecondaryRMs(); + + // Increase the attemptId and create a new intercepter instance for it + attemptId = ApplicationAttemptId.newInstance( + attemptId.getApplicationId(), attemptId.getAttemptId() + 1); + + interceptor = new TestableFederationInterceptor(null, secondaries); + interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, + getConf(), attemptId, "test-user", null, null, null, registry)); + registerResponse = interceptor.registerApplicationMaster(registerReq); + + // Should re-attach secondaries and get the three running containers + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers, + registerResponse.getContainersFromPreviousAttempts().size()); + + // Release all containers + releaseContainersAndAssert( + registerResponse.getContainersFromPreviousAttempts()); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index d4b8735..23c80ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -44,6 +44,15 @@ public class TestableFederationInterceptor extends FederationInterceptor { private AtomicInteger runningIndex = new AtomicInteger(0); private MockResourceManagerFacade mockRm; + public TestableFederationInterceptor() { + } + + public TestableFederationInterceptor(MockResourceManagerFacade homeRM, + ConcurrentHashMap<String, MockResourceManagerFacade> secondaries) { + mockRm = homeRM; + secondaryResourceManagers = secondaries; + } + @Override protected UnmanagedAMPoolManager createUnmanagedAMPoolManager( ExecutorService threadPool) { @@ -68,7 +77,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { // We create one instance of the mock resource manager per sub cluster. Keep // track of the instances of the RMs in the map keyed by the sub cluster id synchronized (this.secondaryResourceManagers) { - if (this.secondaryResourceManagers.contains(subClusterId)) { + if (this.secondaryResourceManagers.containsKey(subClusterId)) { return (T) this.secondaryResourceManagers.get(subClusterId); } else { // The running index here is used to simulate different RM_EPOCH to @@ -91,6 +100,15 @@ public class TestableFederationInterceptor extends FederationInterceptor { } } + protected MockResourceManagerFacade getHomeRM() { + return mockRm; + } + + protected ConcurrentHashMap<String, MockResourceManagerFacade> + getSecondaryRMs() { + return secondaryResourceManagers; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. @@ -104,9 +122,9 @@ public class TestableFederationInterceptor extends FederationInterceptor { @Override public UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, - submitter, appNameSuffix); + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); } } @@ -119,8 +137,9 @@ public class TestableFederationInterceptor extends FederationInterceptor { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 7a225c8..8245fd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.service.AbstractService; @@ -959,9 +960,10 @@ public class MiniYARNCluster extends CompositeService { protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token<AMRMTokenIdentifier> amrmToken, Token<AMRMTokenIdentifier> localToken, - Map<String, byte[]> recoveredDataMap, boolean isRecovery) { + Map<String, byte[]> recoveredDataMap, boolean isRecovery, + Credentials credentials) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken, recoveredDataMap, isRecovery); + localToken, recoveredDataMap, isRecovery, credentials); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md index ef0f713..40d46cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md @@ -141,6 +141,8 @@ The figure shows a sequence diagram for the following job execution flow: b. The security tokens are also modified by the NM when launching the AM, so that the AM can only talk with the AMRMProxy. Any future communication from AM to the YARN RM is mediated by the AMRMProxy. 7. The AM will then request containers using the locality information exposed by HDFS. 8. Based on a policy the AMRMProxy can impersonate the AM on other sub-clusters, by submitting an Unmanaged AM, and by forwarding the AM heartbeats to relevant sub-clusters. + a. Federation supports multiple application attempts with AMRMProxy HA. AM containers will have different attempt id in home sub-cluster, but the same Unmanaged AM in secondaries will be used across attempts. + b. When AMRMProxy HA is enabled, UAM token will be stored in Yarn Registry. In the registerApplicationMaster call of each application attempt, AMRMProxy will go fetch existing UAM tokens from registry (if any) and re-attached to the existing UAMs. 9. The AMRMProxy will use both locality information and a pluggable policy configured in the state-store to decide whether to forward the resource requests received by the AM to the Home RM or to one (or more) Secondary RMs. In Figure 1, we show the case in which the AMRMProxy decides to forward the request to the secondary RM. 10. The secondary RM will provide the AMRMProxy with valid container tokens to start a new container on some node in its sub-cluster. This mechanism ensures that each sub-cluster uses its own security tokens and avoids the need for a cluster wide shared secret to create tokens. 11. The AMRMProxy forwards the allocation response back to the AM. @@ -262,16 +264,17 @@ These are extra configurations that should appear in the **conf/yarn-site.xml** | Property | Example | Description | |:---- |:---- | -| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. -|`yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. +| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy is enabled. | +| `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A comma-separated list of interceptors to be run at the amrmproxy. For federation the last step in the pipeline should be the FederationInterceptor. | | `yarn.client.failover-proxy-provider` | `org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider` | The class used to connect to the RMs by looking up the membership information in federation state-store. This must be set if federation is enabled, even if RM HA is not enabled.| Optional: | Property | Example | Description | |:---- |:---- | -|`yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | -|`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | +| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the AMRMProxy HA is enabled for multiple application attempt suppport. | +| `yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. | +| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. | Running a Sample Job -------------------- --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org