http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index d63b2cf..ebd85bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -34,12 +34,13 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -60,15 +61,19 @@ import 
org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
 import 
org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +87,7 @@ import com.google.common.base.Preconditions;
  * pipeline is a chain of interceptor instances that can inspect and modify the
  * request/response as needed.
  */
-public class AMRMProxyService extends AbstractService implements
+public class AMRMProxyService extends CompositeService implements
     ApplicationMasterProtocol {
   private static final Logger LOG = LoggerFactory
       .getLogger(AMRMProxyService.class);
@@ -96,6 +101,7 @@ public class AMRMProxyService extends AbstractService 
implements
   private InetSocketAddress listenerEndpoint;
   private AMRMProxyTokenSecretManager secretManager;
   private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
+  private RegistryOperations registry;
 
   /**
    * Creates an instance of the service.
@@ -118,10 +124,23 @@ public class AMRMProxyService extends AbstractService 
implements
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
     this.secretManager =
         new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
     this.secretManager.init(conf);
+
+    // Both second app attempt and NM restart within Federation need registry
+    if (conf.getBoolean(YarnConfiguration.AMRM_PROXY_HA_ENABLED,
+        YarnConfiguration.DEFAULT_AMRM_PROXY_HA_ENABLED)
+        || conf.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
+            YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)) {
+      this.registry = FederationStateStoreFacade.createInstance(conf,
+          YarnConfiguration.YARN_REGISTRY_CLASS,
+          YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS,
+          RegistryOperations.class);
+      addService(this.registry);
+    }
+
+    super.serviceInit(conf);
   }
 
   @Override
@@ -203,6 +222,8 @@ public class AMRMProxyService extends AbstractService 
implements
             amrmToken = new Token<>();
             amrmToken.decodeFromUrlString(
                 new String(contextEntry.getValue(), "UTF-8"));
+            // Clear the service field, as if RM just issued the token
+            amrmToken.setService(new Text());
           }
         }
 
@@ -214,12 +235,36 @@ public class AMRMProxyService extends AbstractService 
implements
           throw new IOException("No user found for app attempt " + attemptId);
         }
 
+        // Regenerate the local AMRMToken for the AM
         Token<AMRMTokenIdentifier> localToken =
             this.secretManager.createAndGetAMRMToken(attemptId);
 
+        // Retrieve the AM container credentials from NM context
+        Credentials amCred = null;
+        for (Container container : this.nmContext.getContainers().values()) {
+          LOG.debug("From NM Context container " + container.getContainerId());
+          if (container.getContainerId().getApplicationAttemptId().equals(
+              attemptId) && container.getContainerTokenIdentifier() != null) {
+            LOG.debug("Container type "
+                + container.getContainerTokenIdentifier().getContainerType());
+            if (container.getContainerTokenIdentifier()
+                .getContainerType() == ContainerType.APPLICATION_MASTER) {
+              LOG.info("AM container {} found in context, has credentials: {}",
+                  container.getContainerId(),
+                  (container.getCredentials() != null));
+              amCred = container.getCredentials();
+            }
+          }
+        }
+        if (amCred == null) {
+          LOG.error("No credentials found for AM container of {}. "
+              + "Yarn registry access might not work", attemptId);
+        }
+
+        // Create the intercepter pipeline for the AM
         initializePipeline(attemptId, user, amrmToken, localToken,
-            entry.getValue(), true);
-      } catch (Exception e) {
+            entry.getValue(), true, amCred);
+      } catch (IOException e) {
         LOG.error("Exception when recovering " + attemptId
             + ", removing it from NMStateStore and move on", e);
         this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
@@ -326,7 +371,7 @@ public class AMRMProxyService extends AbstractService 
implements
 
     initializePipeline(appAttemptId,
         containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
-        localToken, null, false);
+        localToken, null, false, credentials);
   }
 
   /**
@@ -342,7 +387,8 @@ public class AMRMProxyService extends AbstractService 
implements
   protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
       String user, Token<AMRMTokenIdentifier> amrmToken,
       Token<AMRMTokenIdentifier> localToken,
-      Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
+      Map<String, byte[]> recoveredDataMap, boolean isRecovery,
+      Credentials credentials) {
     RequestInterceptorChainWrapper chainWrapper = null;
     synchronized (applPipelineMap) {
       if (applPipelineMap
@@ -404,8 +450,9 @@ public class AMRMProxyService extends AbstractService 
implements
     try {
       RequestInterceptor interceptorChain =
           this.createRequestInterceptorChain();
-      interceptorChain.init(createApplicationMasterContext(this.nmContext,
-          applicationAttemptId, user, amrmToken, localToken));
+      interceptorChain.init(
+          createApplicationMasterContext(this.nmContext, applicationAttemptId,
+              user, amrmToken, localToken, credentials, this.registry));
       if (isRecovery) {
         if (recoveredDataMap == null) {
           throw new YarnRuntimeException(
@@ -497,14 +544,12 @@ public class AMRMProxyService extends AbstractService 
implements
       allocateResponse.setAMRMToken(null);
 
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
-          new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
-              token.getIdentifier().array(), token.getPassword().array(),
-              new Text(token.getKind()), new Text(token.getService()));
-
-      context.setAMRMToken(newToken);
+          ConverterUtils.convertFromYarn(token, (Text) null);
 
-      // Update the AMRMToken in context map in NM state store
-      if (this.nmContext.getNMStateStore() != null) {
+      // Update the AMRMToken in context map, and in NM state store if it is
+      // different
+      if (context.setAMRMToken(newToken)
+          && this.nmContext.getNMStateStore() != null) {
         try {
           this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
               context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
@@ -547,10 +592,12 @@ public class AMRMProxyService extends AbstractService 
implements
   private AMRMProxyApplicationContext createApplicationMasterContext(
       Context context, ApplicationAttemptId applicationAttemptId, String user,
       Token<AMRMTokenIdentifier> amrmToken,
-      Token<AMRMTokenIdentifier> localToken) {
+      Token<AMRMTokenIdentifier> localToken, Credentials credentials,
+      RegistryOperations registryImpl) {
     AMRMProxyApplicationContextImpl appContext =
         new AMRMProxyApplicationContextImpl(context, getConfig(),
-            applicationAttemptId, user, amrmToken, localToken);
+            applicationAttemptId, user, amrmToken, localToken, credentials,
+            registryImpl);
     return appContext;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 33cfca3..ef5e061 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -34,6 +34,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -42,6 +44,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -56,17 +59,20 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import 
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import 
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,6 +151,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   private UserGroupInformation appOwner;
 
+  private FederationRegistryClient registryClient;
+
   /**
    * Creates an instance of the FederationInterceptor class.
    */
@@ -179,6 +187,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     } catch (Exception ex) {
       throw new YarnRuntimeException(ex);
     }
+    // Add all app tokens for Yarn Registry access
+    if (this.registryClient != null && appContext.getCredentials() != null) {
+      this.appOwner.addCredentials(appContext.getCredentials());
+    }
 
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
@@ -192,6 +204,11 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
 
     this.uamPool.init(conf);
     this.uamPool.start();
+
+    if (appContext.getRegistryClient() != null) {
+      this.registryClient = new FederationRegistryClient(conf,
+          appContext.getRegistryClient(), this.appOwner);
+    }
   }
 
   /**
@@ -250,20 +267,27 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
      */
     this.amRegistrationResponse =
         this.homeRM.registerApplicationMaster(request);
+    if (this.amRegistrationResponse
+        .getContainersFromPreviousAttempts() != null) {
+      cacheAllocatedContainers(
+          this.amRegistrationResponse.getContainersFromPreviousAttempts(),
+          this.homeSubClusterId);
+    }
+
+    ApplicationId appId =
+        getApplicationContext().getApplicationAttemptId().getApplicationId();
+    reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
 
     // the queue this application belongs will be used for getting
     // AMRMProxy policy from state store.
     String queue = this.amRegistrationResponse.getQueue();
     if (queue == null) {
-      LOG.warn("Received null queue for application "
-          + 
getApplicationContext().getApplicationAttemptId().getApplicationId()
-          + " from home sub-cluster. Will use default queue name "
+      LOG.warn("Received null queue for application " + appId
+          + " from home subcluster. Will use default queue name "
           + YarnConfiguration.DEFAULT_QUEUE_NAME
           + " for getting AMRMProxyPolicy");
     } else {
-      LOG.info("Application "
-          + 
getApplicationContext().getApplicationAttemptId().getApplicationId()
-          + " belongs to queue " + queue);
+      LOG.info("Application " + appId + " belongs to queue " + queue);
     }
 
     // Initialize the AMRMProxyPolicy
@@ -304,7 +328,7 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
       AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
           requests.get(this.homeSubClusterId), this.homeRM,
           this.amRegistrationRequest,
-          getApplicationContext().getApplicationAttemptId());
+          
getApplicationContext().getApplicationAttemptId().getApplicationId());
 
       // Notify policy of home response
       try {
@@ -393,8 +417,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     // request to the home resource manager on this thread.
     FinishApplicationMasterResponse homeResponse =
         AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
-            this.amRegistrationRequest,
-            getApplicationContext().getApplicationAttemptId());
+            this.amRegistrationRequest, getApplicationContext()
+                .getApplicationAttemptId().getApplicationId());
 
     if (subClusterIds.size() > 0) {
       // Wait for other sub-cluster resource managers to return the
@@ -425,6 +449,14 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
 
     if (failedToUnRegister) {
       homeResponse.setIsUnregistered(false);
+    } else {
+      // Clean up UAMs only when the app finishes successfully, so that no more
+      // attempt will be launched.
+      this.uamPool.stop();
+      if (this.registryClient != null) {
+        this.registryClient.removeAppFromRegistry(getApplicationContext()
+            .getApplicationAttemptId().getApplicationId());
+      }
     }
     return homeResponse;
   }
@@ -442,9 +474,8 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
    */
   @Override
   public void shutdown() {
-    if (this.uamPool != null) {
-      this.uamPool.stop();
-    }
+    // Do not stop uamPool service and kill UAMs here because of possible 
second
+    // app attempt
     if (threadpool != null) {
       try {
         threadpool.shutdown();
@@ -456,6 +487,16 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   }
 
   /**
+   * Only for unit test cleanup.
+   */
+  @VisibleForTesting
+  protected void cleanupRegistry() {
+    if (this.registryClient != null) {
+      this.registryClient.cleanAllApplications();
+    }
+  }
+
+  /**
    * Create the UAM pool manager for secondary sub-clsuters. For unit test to
    * override.
    *
@@ -486,6 +527,120 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     }
   }
 
+  private void mergeRegisterResponse(
+      RegisterApplicationMasterResponse homeResponse,
+      RegisterApplicationMasterResponse otherResponse) {
+
+    if (!isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) {
+      if (!isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) {
+        homeResponse.getContainersFromPreviousAttempts()
+            .addAll(otherResponse.getContainersFromPreviousAttempts());
+      } else {
+        homeResponse.setContainersFromPreviousAttempts(
+            otherResponse.getContainersFromPreviousAttempts());
+      }
+    }
+
+    if (!isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) {
+      if (!isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) {
+        homeResponse.getNMTokensFromPreviousAttempts()
+            .addAll(otherResponse.getNMTokensFromPreviousAttempts());
+      } else {
+        homeResponse.setNMTokensFromPreviousAttempts(
+            otherResponse.getNMTokensFromPreviousAttempts());
+      }
+    }
+  }
+
+  /**
+   * Try re-attach to all existing and running UAMs in secondary sub-clusters
+   * launched by previous application attempts if any. All running containers 
in
+   * the UAMs will be combined into the registerResponse. For the first 
attempt,
+   * the registry will be empty for this application and thus no-op here.
+   */
+  protected void reAttachUAMAndMergeRegisterResponse(
+      RegisterApplicationMasterResponse homeResponse,
+      final ApplicationId appId) {
+
+    if (this.registryClient == null) {
+      // Both AMRMProxy HA and NM work preserving restart is not enabled
+      LOG.warn("registryClient is null, skip attaching existing UAM if any");
+      return;
+    }
+
+    // Load existing running UAMs from the previous attempts from
+    // registry, if any
+    Map<String, Token<AMRMTokenIdentifier>> uamMap =
+        this.registryClient.loadStateFromRegistry(appId);
+    if (uamMap.size() == 0) {
+      LOG.info("No existing UAM for application {} found in Yarn Registry",
+          appId);
+      return;
+    }
+    LOG.info("Found {} existing UAMs for application {} in Yarn Registry. "
+        + "Reattaching in parallel", uamMap.size(), appId);
+
+    ExecutorCompletionService<RegisterApplicationMasterResponse>
+        completionService = new ExecutorCompletionService<>(threadpool);
+
+    for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
+      final SubClusterId subClusterId =
+          SubClusterId.newInstance(entry.getKey());
+      final Token<AMRMTokenIdentifier> amrmToken = entry.getValue();
+
+      completionService
+          .submit(new Callable<RegisterApplicationMasterResponse>() {
+            @Override
+            public RegisterApplicationMasterResponse call() throws Exception {
+              RegisterApplicationMasterResponse response = null;
+              try {
+                // Create a config loaded with federation on and subclusterId
+                // for each UAM
+                YarnConfiguration config = new YarnConfiguration(getConf());
+                FederationProxyProviderUtil.updateConfForFederation(config,
+                    subClusterId.getId());
+
+                uamPool.reAttachUAM(subClusterId.getId(), config, appId,
+                    amRegistrationResponse.getQueue(),
+                    getApplicationContext().getUser(), 
homeSubClusterId.getId(),
+                    amrmToken);
+
+                response = uamPool.registerApplicationMaster(
+                    subClusterId.getId(), amRegistrationRequest);
+
+                if (response != null
+                    && response.getContainersFromPreviousAttempts() != null) {
+                  cacheAllocatedContainers(
+                      response.getContainersFromPreviousAttempts(),
+                      subClusterId);
+                }
+                LOG.info("UAM {} reattached for {}", subClusterId, appId);
+              } catch (Throwable e) {
+                LOG.error(
+                    "Reattaching UAM " + subClusterId + " failed for " + appId,
+                    e);
+              }
+              return response;
+            }
+          });
+    }
+
+    // Wait for the re-attach responses
+    for (int i = 0; i < uamMap.size(); i++) {
+      try {
+        Future<RegisterApplicationMasterResponse> future =
+            completionService.take();
+        RegisterApplicationMasterResponse registerResponse = future.get();
+        if (registerResponse != null) {
+          LOG.info("Merging register response for {}", appId);
+          mergeRegisterResponse(homeResponse, registerResponse);
+        }
+      } catch (Exception e) {
+        LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e);
+      }
+    }
+  }
+
   private SubClusterId getSubClusterForNode(String nodeName) {
     SubClusterId subClusterId = null;
     try {
@@ -655,6 +810,20 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
                 responses.add(response);
               }
 
+              // Save the new AMRMToken for the UAM in registry if present
+              if (response.getAMRMToken() != null) {
+                Token<AMRMTokenIdentifier> newToken = ConverterUtils
+                    .convertFromYarn(response.getAMRMToken(), (Text) null);
+                // Update the token in registry
+                if (registryClient != null) {
+                  registryClient
+                      .writeAMRMTokenForUAM(
+                          getApplicationContext().getApplicationAttemptId()
+                              .getApplicationId(),
+                          subClusterId.getId(), newToken);
+                }
+              }
+
               // Notify policy of secondary sub-cluster responses
               try {
                 policyInterpreter.notifyOfResponse(subClusterId, response);
@@ -714,20 +883,23 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
                     subClusterId);
 
                 RegisterApplicationMasterResponse uamResponse = null;
+                Token<AMRMTokenIdentifier> token = null;
                 try {
                   // For appNameSuffix, use subClusterId of the home 
sub-cluster
-                  uamResponse = uamPool.createAndRegisterNewUAM(subClusterId,
-                      registerRequest, config,
+                  token = uamPool.launchUAM(subClusterId, config,
                       appContext.getApplicationAttemptId().getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString());
+                      homeSubClusterId.toString(), registryClient != null);
+
+                  uamResponse = uamPool.registerApplicationMaster(subClusterId,
+                      registerRequest);
                 } catch (Throwable e) {
                   LOG.error("Failed to register application master: "
                       + subClusterId + " Application: "
                       + appContext.getApplicationAttemptId(), e);
                 }
                 return new RegisterApplicationMasterResponseInfo(uamResponse,
-                    SubClusterId.newInstance(subClusterId));
+                    SubClusterId.newInstance(subClusterId), token);
               }
             });
       }
@@ -752,6 +924,14 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
                 + getApplicationContext().getApplicationAttemptId());
             successfulRegistrations.put(uamResponse.getSubClusterId(),
                 uamResponse.getResponse());
+
+            if (registryClient != null) {
+              registryClient.writeAMRMTokenForUAM(
+                  getApplicationContext().getApplicationAttemptId()
+                      .getApplicationId(),
+                  uamResponse.getSubClusterId().getId(),
+                  uamResponse.getUamToken());
+            }
           }
         } catch (Exception e) {
           LOG.warn("Failed to register unmanaged application master: "
@@ -1087,11 +1267,14 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
   private static class RegisterApplicationMasterResponseInfo {
     private RegisterApplicationMasterResponse response;
     private SubClusterId subClusterId;
+    private Token<AMRMTokenIdentifier> uamToken;
 
     RegisterApplicationMasterResponseInfo(
-        RegisterApplicationMasterResponse response, SubClusterId subClusterId) 
{
+        RegisterApplicationMasterResponse response, SubClusterId subClusterId,
+        Token<AMRMTokenIdentifier> uamToken) {
       this.response = response;
       this.subClusterId = subClusterId;
+      this.uamToken = uamToken;
     }
 
     public RegisterApplicationMasterResponse getResponse() {
@@ -1101,6 +1284,10 @@ public class FederationInterceptor extends 
AbstractRequestInterceptor {
     public SubClusterId getSubClusterId() {
       return subClusterId;
     }
+
+    public Token<AMRMTokenIdentifier> getUamToken() {
+      return uamToken;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 831ba0b..44bfc68 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -353,10 +353,6 @@ public class ContainerManagerImpl extends CompositeService 
implements
       rsrcLocalizationSrvc.recoverLocalizedResources(
           stateStore.loadLocalizationState());
 
-      if (this.amrmProxyEnabled) {
-        this.getAMRMProxyService().recover();
-      }
-
       RecoveredApplicationsState appsState = 
stateStore.loadApplicationsState();
       for (ContainerManagerApplicationProto proto :
            appsState.getApplications()) {
@@ -373,6 +369,11 @@ public class ContainerManagerImpl extends CompositeService 
implements
         recoverContainer(rcs);
       }
 
+      // Recovery AMRMProxy state after apps and containers are recovered
+      if (this.amrmProxyEnabled) {
+        this.getAMRMProxyService().recover();
+      }
+
       //Dispatching the RECOVERY_COMPLETED event through the dispatcher
       //so that all the paused, scheduled and queued containers will
       //be scheduled for execution on availability of resources.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 3c57496..da1d047 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -56,10 +57,10 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
-import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
@@ -179,6 +180,15 @@ public abstract class BaseAMRMProxyTest {
     return new NMContext(null, null, null, null, stateStore, false, this.conf);
   }
 
+  protected List<ContainerId> getCompletedContainerIds(
+      List<ContainerStatus> containerStatus) {
+    List<ContainerId> ret = new ArrayList<>();
+    for (ContainerStatus status : containerStatus) {
+      ret.add(status.getContainerId());
+    }
+    return ret;
+  }
+
   /**
    * This helper method will invoke the specified function in parallel for each
    * end point in the specified list using a thread pool and return the
@@ -623,7 +633,7 @@ public abstract class BaseAMRMProxyTest {
      */
     public void initApp(ApplicationAttemptId applicationId, String user) {
       super.initializePipeline(applicationId, user,
-          new Token<AMRMTokenIdentifier>(), null, null, false);
+          new Token<AMRMTokenIdentifier>(), null, null, false, null);
     }
 
     public void stopApp(ApplicationId applicationId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
index 937ede5..b955311 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -444,7 +444,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest 
{
 
     applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
     getAMRMProxyService().initializePipeline(applicationAttemptId, user,
-        new Token<AMRMTokenIdentifier>(), null, null, false);
+        new Token<AMRMTokenIdentifier>(), null, null, false, null);
 
     RequestInterceptorChainWrapper chain2 =
         getAMRMProxyService().getPipelines().get(appId);
@@ -531,16 +531,14 @@ public class TestAMRMProxyService extends 
BaseAMRMProxyTest {
         "new AMRMToken from RM should have been nulled by AMRMProxyService",
         allocateResponse.getAMRMToken());
 
-    // The way the mock resource manager is setup, it will return the 
containers
-    // that were released in the response. This is done because the UAMs run
-    // asynchronously and we need to if all the resource managers received the
-    // release it. The containers sent by the mock resource managers will be
+    // We need to make sure all the resource managers received the
+    // release list. The containers sent by the mock resource managers will be
     // aggregated and returned back to us and we can assert if all the release
     // lists reached the sub-clusters
-    List<Container> containersForReleasedContainerIds =
-        new ArrayList<Container>();
-    containersForReleasedContainerIds.addAll(allocateResponse
-        .getAllocatedContainers());
+    List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
+    List<ContainerId> newlyFinished = getCompletedContainerIds(
+        allocateResponse.getCompletedContainersStatuses());
+    containersForReleasedContainerIds.addAll(newlyFinished);
 
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
@@ -554,8 +552,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest 
{
           "new AMRMToken from RM should have been nulled by AMRMProxyService",
           allocateResponse.getAMRMToken());
 
-      containersForReleasedContainerIds.addAll(allocateResponse
-          .getAllocatedContainers());
+      newlyFinished = getCompletedContainerIds(
+          allocateResponse.getCompletedContainersStatuses());
+      containersForReleasedContainerIds.addAll(newlyFinished);
 
       LOG.info("Number of containers received in this request: "
           + Integer.toString(allocateResponse.getAllocatedContainers()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 3db0e35..aa7ed69 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -19,16 +19,20 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -59,6 +63,10 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -79,7 +87,10 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
 
   private TestableFederationInterceptor interceptor;
   private MemoryFederationStateStore stateStore;
+  private NMStateStoreService nmStateStore;
+  private RegistryOperations registry;
 
+  private Context nmContext;
   private int testAppId;
   private ApplicationAttemptId attemptId;
 
@@ -93,15 +104,28 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
     FederationStateStoreFacade.getInstance().reinitialize(stateStore,
         getConf());
 
+    nmStateStore = new NMMemoryStateStoreService();
+    nmStateStore.init(getConf());
+    nmStateStore.start();
+
+    registry = new FSRegistryOperationsService();
+    registry.init(getConf());
+    registry.start();
+
     testAppId = 1;
     attemptId = getApplicationAttemptId(testAppId);
-    interceptor.init(new AMRMProxyApplicationContextImpl(null, getConf(),
-        attemptId, "test-user", null, null));
+    nmContext =
+        new NMContext(null, null, null, null, nmStateStore, false, getConf());
+    interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(),
+        attemptId, "test-user", null, null, null, registry));
+    interceptor.cleanupRegistry();
   }
 
   @Override
   public void tearDown() {
+    interceptor.cleanupRegistry();
     interceptor.shutdown();
+    registry.stop();
     super.tearDown();
   }
 
@@ -207,18 +231,17 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
     AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
     Assert.assertNotNull(allocateResponse);
 
-    // The way the mock resource manager is setup, it will return the 
containers
-    // that were released in the allocated containers. The release request will
-    // be split and handled by the corresponding UAM. The release containers
-    // returned by the mock resource managers will be aggregated and returned
-    // back to us and we can check if total request size and returned size are
-    // the same
-    List<Container> containersForReleasedContainerIds =
-        new ArrayList<Container>();
-    containersForReleasedContainerIds
-        .addAll(allocateResponse.getAllocatedContainers());
+    // The release request will be split and handled by the corresponding UAM.
+    // The release containers returned by the mock resource managers will be
+    // aggregated and returned back to us and we can check if total request 
size
+    // and returned size are the same
+    List<ContainerId> containersForReleasedContainerIds =
+        new ArrayList<ContainerId>();
+    List<ContainerId> newlyFinished = getCompletedContainerIds(
+        allocateResponse.getCompletedContainersStatuses());
+    containersForReleasedContainerIds.addAll(newlyFinished);
     LOG.info("Number of containers received in the original request: "
-        + Integer.toString(allocateResponse.getAllocatedContainers().size()));
+        + Integer.toString(newlyFinished.size()));
 
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
@@ -228,11 +251,12 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
       allocateResponse =
           interceptor.allocate(Records.newRecord(AllocateRequest.class));
       Assert.assertNotNull(allocateResponse);
-      containersForReleasedContainerIds
-          .addAll(allocateResponse.getAllocatedContainers());
+      newlyFinished = getCompletedContainerIds(
+          allocateResponse.getCompletedContainersStatuses());
+      containersForReleasedContainerIds.addAll(newlyFinished);
 
       LOG.info("Number of containers received in this request: "
-          + 
Integer.toString(allocateResponse.getAllocatedContainers().size()));
+          + Integer.toString(newlyFinished.size()));
       LOG.info("Total number of containers received: "
           + Integer.toString(containersForReleasedContainerIds.size()));
       Thread.sleep(10);
@@ -547,4 +571,74 @@ public class TestFederationInterceptor extends 
BaseAMRMProxyTest {
     Assert.assertEquals(1, response.getUpdatedContainers().size());
     Assert.assertEquals(1, response.getUpdateErrors().size());
   }
+
+  @Test
+  public void testSecondAttempt() throws Exception {
+    ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId);
+    userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        // Register the application
+        RegisterApplicationMasterRequest registerReq =
+            Records.newRecord(RegisterApplicationMasterRequest.class);
+        registerReq.setHost(Integer.toString(testAppId));
+        registerReq.setRpcPort(testAppId);
+        registerReq.setTrackingUrl("");
+
+        RegisterApplicationMasterResponse registerResponse =
+            interceptor.registerApplicationMaster(registerReq);
+        Assert.assertNotNull(registerResponse);
+
+        Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+        // Allocate one batch of containers
+        registerSubCluster(SubClusterId.newInstance("SC-1"));
+        registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
+
+        int numberOfContainers = 3;
+        List<Container> containers =
+            getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+        for (Container c : containers) {
+          System.out.println(c.getId() + " ha");
+        }
+        Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+        // Preserve the mock RM instances for secondaries
+        ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
+            interceptor.getSecondaryRMs();
+
+        // Increase the attemptId and create a new intercepter instance for it
+        attemptId = ApplicationAttemptId.newInstance(
+            attemptId.getApplicationId(), attemptId.getAttemptId() + 1);
+
+        interceptor = new TestableFederationInterceptor(null, secondaries);
+        interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
+            getConf(), attemptId, "test-user", null, null, null, registry));
+        registerResponse = interceptor.registerApplicationMaster(registerReq);
+
+        // Should re-attach secondaries and get the three running containers
+        Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        Assert.assertEquals(numberOfContainers,
+            registerResponse.getContainersFromPreviousAttempts().size());
+
+        // Release all containers
+        releaseContainersAndAssert(
+            registerResponse.getContainersFromPreviousAttempts());
+
+        // Finish the application
+        FinishApplicationMasterRequest finishReq =
+            Records.newRecord(FinishApplicationMasterRequest.class);
+        finishReq.setDiagnostics("");
+        finishReq.setTrackingUrl("");
+        finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+        FinishApplicationMasterResponse finshResponse =
+            interceptor.finishApplicationMaster(finishReq);
+        Assert.assertNotNull(finshResponse);
+        Assert.assertEquals(true, finshResponse.getIsUnregistered());
+        return null;
+      }
+    });
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index d4b8735..23c80ae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -44,6 +44,15 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
   private AtomicInteger runningIndex = new AtomicInteger(0);
   private MockResourceManagerFacade mockRm;
 
+  public TestableFederationInterceptor() {
+  }
+
+  public TestableFederationInterceptor(MockResourceManagerFacade homeRM,
+      ConcurrentHashMap<String, MockResourceManagerFacade> secondaries) {
+    mockRm = homeRM;
+    secondaryResourceManagers = secondaries;
+  }
+
   @Override
   protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
       ExecutorService threadPool) {
@@ -68,7 +77,7 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
     // We create one instance of the mock resource manager per sub cluster. 
Keep
     // track of the instances of the RMs in the map keyed by the sub cluster id
     synchronized (this.secondaryResourceManagers) {
-      if (this.secondaryResourceManagers.contains(subClusterId)) {
+      if (this.secondaryResourceManagers.containsKey(subClusterId)) {
         return (T) this.secondaryResourceManagers.get(subClusterId);
       } else {
         // The running index here is used to simulate different RM_EPOCH to
@@ -91,6 +100,15 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
     }
   }
 
+  protected MockResourceManagerFacade getHomeRM() {
+    return mockRm;
+  }
+
+  protected ConcurrentHashMap<String, MockResourceManagerFacade>
+      getSecondaryRMs() {
+    return secondaryResourceManagers;
+  }
+
   /**
    * Extends the UnmanagedAMPoolManager and overrides methods to provide a
    * testable implementation of UnmanagedAMPoolManager.
@@ -104,9 +122,9 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
     @Override
     public UnmanagedApplicationManager createUAM(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix) {
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) 
{
       return new TestableUnmanagedApplicationManager(conf, appId, queueName,
-          submitter, appNameSuffix);
+          submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
     }
   }
 
@@ -119,8 +137,9 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
 
     public TestableUnmanagedApplicationManager(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix) {
-      super(conf, appId, queueName, submitter, appNameSuffix);
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) 
{
+      super(conf, appId, queueName, submitter, appNameSuffix,
+          keepContainersAcrossApplicationAttempts);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 7a225c8..8245fd6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.service.AbstractService;
@@ -959,9 +960,10 @@ public class MiniYARNCluster extends CompositeService {
     protected void initializePipeline(ApplicationAttemptId 
applicationAttemptId,
         String user, Token<AMRMTokenIdentifier> amrmToken,
         Token<AMRMTokenIdentifier> localToken,
-        Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
+        Map<String, byte[]> recoveredDataMap, boolean isRecovery,
+        Credentials credentials) {
       super.initializePipeline(applicationAttemptId, user, amrmToken,
-          localToken, recoveredDataMap, isRecovery);
+          localToken, recoveredDataMap, isRecovery, credentials);
       RequestInterceptor rt = getPipelines()
           .get(applicationAttemptId.getApplicationId()).getRootInterceptor();
       // The DefaultRequestInterceptor will generally be the last

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index ef0f713..40d46cb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -141,6 +141,8 @@ The figure shows a sequence diagram for the following job 
execution flow:
     b. The security tokens are also modified by the NM when launching the AM, 
so that the AM can only talk with the AMRMProxy. Any future communication from 
AM to the YARN RM is mediated by the AMRMProxy.
 7. The AM will then request containers using the locality information exposed 
by HDFS.
 8. Based on a policy the AMRMProxy can impersonate the AM on other 
sub-clusters, by submitting an Unmanaged AM, and by forwarding the AM 
heartbeats to relevant sub-clusters.
+    a. Federation supports multiple application attempts with AMRMProxy HA. AM 
containers will have different attempt id in home sub-cluster, but the same 
Unmanaged AM in secondaries will be used across attempts.
+    b. When AMRMProxy HA is enabled, UAM token will be stored in Yarn 
Registry. In the registerApplicationMaster call of each application attempt, 
AMRMProxy will go fetch existing UAM tokens from registry (if any) and 
re-attached to the existing UAMs.
 9. The AMRMProxy will use both locality information and a pluggable policy 
configured in the state-store to decide whether to forward the resource 
requests received by the AM to the Home RM or to one (or more) Secondary RMs. 
In Figure 1, we show the case in which the AMRMProxy decides to forward the 
request to the secondary RM.
 10. The secondary RM will provide the AMRMProxy with valid container tokens to 
start a new container on some node in its sub-cluster. This mechanism ensures 
that each sub-cluster uses its own security tokens and avoids the need for a 
cluster wide shared secret to create tokens.
 11. The AMRMProxy forwards the allocation response back to the AM.
@@ -262,16 +264,17 @@ These are extra configurations that should appear in the 
**conf/yarn-site.xml**
 
 | Property | Example | Description |
 |:---- |:---- |
-| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy 
is enabled.
-|`yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | 
`org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A 
comma-separated list of interceptors to be run at the amrmproxy. For federation 
the last step in the pipeline should be the FederationInterceptor.
+| `yarn.nodemanager.amrmproxy.enabled` | `true` | Whether or not the AMRMProxy 
is enabled. |
+| `yarn.nodemanager.amrmproxy.interceptor-class.pipeline` | 
`org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor` | A 
comma-separated list of interceptors to be run at the amrmproxy. For federation 
the last step in the pipeline should be the FederationInterceptor. |
 | `yarn.client.failover-proxy-provider` | 
`org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider`
 | The class used to connect to the RMs by looking up the membership 
information in federation state-store. This must be set if federation is 
enabled, even if RM HA is not enabled.|
 
 Optional:
 
 | Property | Example | Description |
 |:---- |:---- |
-|`yarn.federation.statestore.max-connections` | `1` | The maximum number of 
parallel connections from each AMRMProxy to the state-store. This value is 
typically lower than the router one, since we have many AMRMProxy that could 
burn-through many DB connections quickly. |
-|`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the 
AMRMProxy cache. Typically larger than at the router, as the number of 
AMRMProxy is large, and we want to limit the load to the centralized 
state-store. |
+| `yarn.nodemanager.amrmproxy.ha.enable` | `true` | Whether or not the 
AMRMProxy HA is enabled for multiple application attempt suppport. |
+| `yarn.federation.statestore.max-connections` | `1` | The maximum number of 
parallel connections from each AMRMProxy to the state-store. This value is 
typically lower than the router one, since we have many AMRMProxy that could 
burn-through many DB connections quickly. |
+| `yarn.federation.cache-ttl.secs` | `300` | The time to leave for the 
AMRMProxy cache. Typically larger than at the router, as the number of 
AMRMProxy is large, and we want to limit the load to the centralized 
state-store. |
 
 Running a Sample Job
 --------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to