YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru).

(cherry picked from commit ed3109136100a21d971484f242d80f2a7e7d337d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6db6c98
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6db6c98
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6db6c98

Branch: refs/heads/branch-2.9
Commit: a6db6c9855e2e5dfb36165209f497ad2bf743bb3
Parents: 49ba091
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Nov 20 14:21:58 2017 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Nov 20 14:22:59 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  13 +
 .../src/main/resources/yarn-default.xml         |  21 ++
 .../hadoop-yarn-server-common/pom.xml           |   5 +
 .../utils/FederationRegistryClient.java         | 338 +++++++++++++++++++
 .../yarn/server/uam/UnmanagedAMPoolManager.java | 140 ++++++--
 .../server/uam/UnmanagedApplicationManager.java | 212 +++++++-----
 .../yarn/server/utils/AMRMClientUtils.java      |  30 +-
 .../yarn/server/MockResourceManagerFacade.java  | 100 +++---
 .../utils/TestFederationRegistryClient.java     |  90 +++++
 .../uam/TestUnmanagedApplicationManager.java    | 100 +++++-
 .../amrmproxy/AMRMProxyApplicationContext.java  |  16 +
 .../AMRMProxyApplicationContextImpl.java        |  35 +-
 .../nodemanager/amrmproxy/AMRMProxyService.java |  83 ++++-
 .../amrmproxy/FederationInterceptor.java        | 221 +++++++++++-
 .../containermanager/ContainerManagerImpl.java  |   9 +-
 .../amrmproxy/BaseAMRMProxyTest.java            |  12 +-
 .../amrmproxy/TestAMRMProxyService.java         |  21 +-
 .../amrmproxy/TestFederationInterceptor.java    | 126 ++++++-
 .../TestableFederationInterceptor.java          |  29 +-
 .../hadoop/yarn/server/MiniYARNCluster.java     |   6 +-
 .../src/site/markdown/Federation.md             |  11 +-
 21 files changed, 1341 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a97dc57..edeec9f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1948,6 +1948,9 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
       "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
           + "DefaultRequestInterceptor";
+  public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX
+      + "amrmproxy.ha.enable";
+  public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false;
 
   /**
    * Default platform-agnostic CLASSPATH for YARN applications. A
@@ -2790,6 +2793,11 @@ public class YarnConfiguration extends Configuration {
   public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
       FEDERATION_PREFIX + "cache-ttl.secs";
 
+  public static final String FEDERATION_REGISTRY_BASE_KEY =
+      FEDERATION_PREFIX + "registry.base-dir";
+  public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
+      "yarnfederation/";
+
   // 5 minutes
   public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
 
@@ -2947,6 +2955,11 @@ public class YarnConfiguration extends Configuration {
   // Other Configs
   ////////////////////////////////
 
+  public static final String YARN_REGISTRY_CLASS =
+      YARN_PREFIX + "registry.class";
+  public static final String DEFAULT_YARN_REGISTRY_CLASS =
+      "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService";
+
   /**
    * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
    * The interval of the yarn client's querying application state after

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 46fb7c7..71dd72a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2815,7 +2815,20 @@
    <value>300</value>
   </property>
 
+  <property>
+    <description>The registry base directory for federation.</description>
+    <name>yarn.federation.registry.base-dir</name>
+    <value>yarnfederation/</value>
+  </property>
+
   <!-- Other Configuration -->
+
+  <property>
+    <description>The registry implementation to use.</description>
+    <name>yarn.registry.class</name>
+    
<value>org.apache.hadoop.registry.client.impl.FSRegistryOperationsService</value>
+  </property>
+
   <property>
     <description>The interval that the yarn client library uses to poll the
     completion status of the asynchronous API of application client protocol.
@@ -2978,6 +2991,14 @@
 
   <property>
     <description>
+    Whether AMRMProxy HA is enabled.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.ha.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
     Setting that controls whether distributed scheduling is enabled.
     </description>
     <name>yarn.nodemanager.distributed-scheduling.enabled</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 4eb9130..735f6c7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -68,6 +68,11 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-registry</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
new file mode 100644
index 0000000..6624318
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Helper class that handles reads and writes to Yarn Registry to support UAM 
HA
+ * and second attempt.
+ */
+public class FederationRegistryClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationRegistryClient.class);
+
+  private RegistryOperations registry;
+
+  private UserGroupInformation user;
+
+  // AppId -> SubClusterId -> UAM token
+  private Map<ApplicationId, Map<String, Token<AMRMTokenIdentifier>>>
+      appSubClusterTokenMap;
+
+  // Structure in registry: <registryBaseDir>/<AppId>/<SubClusterId> -> 
UAMToken
+  private String registryBaseDir;
+
+  public FederationRegistryClient(Configuration conf,
+      RegistryOperations registry, UserGroupInformation user) {
+    this.registry = registry;
+    this.user = user;
+    this.appSubClusterTokenMap = new ConcurrentHashMap<>();
+    this.registryBaseDir =
+        conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY,
+            YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY);
+    LOG.info("Using registry {} with base directory: {}",
+        this.registry.getClass().getName(), this.registryBaseDir);
+  }
+
+  /**
+   * Get the list of known applications in the registry.
+   *
+   * @return the list of known applications
+   */
+  public List<String> getAllApplications() {
+    // Suppress the exception here because it is valid that the entry does not
+    // exist
+    List<String> applications = null;
+    try {
+      applications = listDirRegistry(this.registry, this.user,
+          getRegistryKey(null, null), false);
+    } catch (YarnException e) {
+      LOG.warn("Unexpected exception from listDirRegistry", e);
+    }
+    if (applications == null) {
+      // It is valid for listDirRegistry to return null
+      return new ArrayList<>();
+    }
+    return applications;
+  }
+
+  /**
+   * For testing, delete all application records in registry.
+   */
+  @VisibleForTesting
+  public void cleanAllApplications() {
+    try {
+      removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
+          true, false);
+    } catch (YarnException e) {
+      LOG.warn("Unexpected exception from removeKeyRegistry", e);
+    }
+  }
+
+  /**
+   * Write/update the UAM token for an application and a sub-cluster.
+   *
+   * @param subClusterId sub-cluster id of the token
+   * @param token the UAM of the application
+   * @return whether the amrmToken is added or updated to a new value
+   */
+  public boolean writeAMRMTokenForUAM(ApplicationId appId,
+      String subClusterId, Token<AMRMTokenIdentifier> token) {
+    Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
+        this.appSubClusterTokenMap.get(appId);
+    if (subClusterTokenMap == null) {
+      subClusterTokenMap = new ConcurrentHashMap<>();
+      this.appSubClusterTokenMap.put(appId, subClusterTokenMap);
+    }
+
+    boolean update = !token.equals(subClusterTokenMap.get(subClusterId));
+    if (!update) {
+      LOG.debug("Same amrmToken received from {}, skip writing registry for 
{}",
+          subClusterId, appId);
+      return update;
+    }
+
+    LOG.info("Writing/Updating amrmToken for {} to registry for {}",
+        subClusterId, appId);
+    try {
+      // First, write the token entry
+      writeRegistry(this.registry, this.user,
+          getRegistryKey(appId, subClusterId), token.encodeToUrlString(), 
true);
+
+      // Then update the subClusterTokenMap
+      subClusterTokenMap.put(subClusterId, token);
+    } catch (YarnException | IOException e) {
+      LOG.error(
+          "Failed writing AMRMToken to registry for subcluster " + 
subClusterId,
+          e);
+    }
+    return update;
+  }
+
+  /**
+   * Load the information of one application from registry.
+   *
+   * @param appId application id
+   * @return the sub-cluster to UAM token mapping
+   */
+  public Map<String, Token<AMRMTokenIdentifier>>
+      loadStateFromRegistry(ApplicationId appId) {
+    Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
+    // Suppress the exception here because it is valid that the entry does not
+    // exist
+    List<String> subclusters = null;
+    try {
+      subclusters = listDirRegistry(this.registry, this.user,
+          getRegistryKey(appId, null), false);
+    } catch (YarnException e) {
+      LOG.warn("Unexpected exception from listDirRegistry", e);
+    }
+
+    if (subclusters == null) {
+      LOG.info("Application {} does not exist in registry", appId);
+      return retMap;
+    }
+
+    // Read the amrmToken for each sub-cluster with an existing UAM
+    for (String scId : subclusters) {
+      LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId);
+      String key = getRegistryKey(appId, scId);
+      try {
+        String tokenString = readRegistry(this.registry, this.user, key, true);
+        if (tokenString == null) {
+          throw new YarnException("Null string from readRegistry key " + key);
+        }
+        Token<AMRMTokenIdentifier> amrmToken = new Token<>();
+        amrmToken.decodeFromUrlString(tokenString);
+        // Clear the service field, as if RM just issued the token
+        amrmToken.setService(new Text());
+
+        retMap.put(scId, amrmToken);
+      } catch (Exception e) {
+        LOG.error("Failed reading registry key " + key
+            + ", skipping subcluster " + scId, e);
+      }
+    }
+
+    // Override existing map if there
+    this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap));
+    return retMap;
+  }
+
+  /**
+   * Remove an application from registry.
+   *
+   * @param appId application id
+   */
+  public void removeAppFromRegistry(ApplicationId appId) {
+    Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
+        this.appSubClusterTokenMap.get(appId);
+    LOG.info("Removing all registry entries for {}", appId);
+
+    if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
+      return;
+    }
+
+    // Lastly remove the application directory
+    String key = getRegistryKey(appId, null);
+    try {
+      removeKeyRegistry(this.registry, this.user, key, true, true);
+      subClusterTokenMap.clear();
+    } catch (YarnException e) {
+      LOG.error("Failed removing registry directory key " + key, e);
+    }
+  }
+
+  private String getRegistryKey(ApplicationId appId, String fileName) {
+    if (appId == null) {
+      return this.registryBaseDir;
+    }
+    if (fileName == null) {
+      return this.registryBaseDir + appId.toString();
+    }
+    return this.registryBaseDir + appId.toString() + "/" + fileName;
+  }
+
+  private String readRegistry(final RegistryOperations registryImpl,
+      UserGroupInformation ugi, final String key, final boolean throwIfFails)
+      throws YarnException {
+    // Use the ugi loaded with app credentials to access registry
+    String result = ugi.doAs(new PrivilegedAction<String>() {
+      @Override
+      public String run() {
+        try {
+          ServiceRecord value = registryImpl.resolve(key);
+          if (value != null) {
+            return value.description;
+          }
+        } catch (Throwable e) {
+          if (throwIfFails) {
+            LOG.error("Registry resolve key " + key + " failed", e);
+          }
+        }
+        return null;
+      }
+    });
+    if (result == null && throwIfFails) {
+      throw new YarnException("Registry resolve key " + key + " failed");
+    }
+    return result;
+  }
+
+  private void removeKeyRegistry(final RegistryOperations registryImpl,
+      UserGroupInformation ugi, final String key, final boolean recursive,
+      final boolean throwIfFails) throws YarnException {
+    // Use the ugi loaded with app credentials to access registry
+    boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
+      @Override
+      public Boolean run() {
+        try {
+          registryImpl.delete(key, recursive);
+          return true;
+        } catch (Throwable e) {
+          if (throwIfFails) {
+            LOG.error("Registry remove key " + key + " failed", e);
+          }
+        }
+        return false;
+      }
+    });
+    if (!success && throwIfFails) {
+      throw new YarnException("Registry remove key " + key + " failed");
+    }
+  }
+
+  /**
+   * Write registry entry, override if exists.
+   */
+  private void writeRegistry(final RegistryOperations registryImpl,
+      UserGroupInformation ugi, final String key, final String value,
+      final boolean throwIfFails) throws YarnException {
+
+    final ServiceRecord recordValue = new ServiceRecord();
+    recordValue.description = value;
+    // Use the ugi loaded with app credentials to access registry
+    boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
+      @Override
+      public Boolean run() {
+        try {
+          registryImpl.bind(key, recordValue, BindFlags.OVERWRITE);
+          return true;
+        } catch (Throwable e) {
+          if (throwIfFails) {
+            LOG.error("Registry write key " + key + " failed", e);
+          }
+        }
+        return false;
+      }
+    });
+    if (!success && throwIfFails) {
+      throw new YarnException("Registry write key " + key + " failed");
+    }
+  }
+
+  /**
+   * List the sub directories in the given directory.
+   */
+  private List<String> listDirRegistry(final RegistryOperations registryImpl,
+      UserGroupInformation ugi, final String key, final boolean throwIfFails)
+      throws YarnException {
+    List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
+      @Override
+      public List<String> run() {
+        try {
+          return registryImpl.list(key);
+        } catch (Throwable e) {
+          if (throwIfFails) {
+            LOG.error("Registry list key " + key + " failed", e);
+          }
+        }
+        return null;
+      }
+    });
+    if (result == null && throwIfFails) {
+      throw new YarnException("Registry list key " + key + " failed");
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 08aee77..0c01217 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -44,9 +45,9 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.slf4j.Logger;
@@ -67,7 +68,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
   // Map from uamId to UAM instances
   private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap;
 
-  private Map<String, ApplicationAttemptId> attemptIdMap;
+  private Map<String, ApplicationId> appIdMap;
 
   private ExecutorService threadpool;
 
@@ -82,7 +83,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
       this.threadpool = Executors.newCachedThreadPool();
     }
     this.unmanagedAppMasterMap = new ConcurrentHashMap<>();
-    this.attemptIdMap = new ConcurrentHashMap<>();
+    this.appIdMap = new ConcurrentHashMap<>();
     super.serviceStart();
   }
 
@@ -114,7 +115,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
         public KillApplicationResponse call() throws Exception {
           try {
             LOG.info("Force-killing UAM id " + uamId + " for application "
-                + attemptIdMap.get(uamId));
+                + appIdMap.get(uamId));
             return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
           } catch (Exception e) {
             LOG.error("Failed to kill unmanaged application master", e);
@@ -132,7 +133,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
         LOG.error("Failed to kill unmanaged application master", e);
       }
     }
-    this.attemptIdMap.clear();
+    this.appIdMap.clear();
     super.serviceStop();
   }
 
@@ -145,13 +146,18 @@ public class UnmanagedAMPoolManager extends 
AbstractService {
    * @param queueName queue of the application
    * @param submitter submitter name of the UAM
    * @param appNameSuffix application name suffix for the UAM
+   * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+   *          recovery.
+   * @see ApplicationSubmissionContext
+   *          #setKeepContainersAcrossApplicationAttempts(boolean)
    * @return uamId for the UAM
    * @throws YarnException if registerApplicationMaster fails
    * @throws IOException if registerApplicationMaster fails
    */
   public String createAndRegisterNewUAM(
       RegisterApplicationMasterRequest registerRequest, Configuration conf,
-      String queueName, String submitter, String appNameSuffix)
+      String queueName, String submitter, String appNameSuffix,
+      boolean keepContainersAcrossApplicationAttempts)
       throws YarnException, IOException {
     ApplicationId appId = null;
     ApplicationClientProtocol rmClient;
@@ -173,45 +179,93 @@ public class UnmanagedAMPoolManager extends 
AbstractService {
       rmClient = null;
     }
 
-    createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId,
-        queueName, submitter, appNameSuffix);
+    // Launch the UAM in RM
+    launchUAM(appId.toString(), conf, appId, queueName, submitter,
+        appNameSuffix, keepContainersAcrossApplicationAttempts);
+
+    // Register the UAM application
+    registerApplicationMaster(appId.toString(), registerRequest);
+
+    // Returns the appId as uamId
     return appId.toString();
   }
 
   /**
-   * Create a new UAM and register the application, using the provided uamId 
and
-   * appId.
+   * Launch a new UAM, using the provided uamId and appId.
    *
-   * @param uamId identifier for the UAM
-   * @param registerRequest RegisterApplicationMasterRequest
+   * @param uamId uam Id
    * @param conf configuration for this UAM
    * @param appId application id for the UAM
    * @param queueName queue of the application
    * @param submitter submitter name of the UAM
    * @param appNameSuffix application name suffix for the UAM
-   * @return RegisterApplicationMasterResponse
-   * @throws YarnException if registerApplicationMaster fails
-   * @throws IOException if registerApplicationMaster fails
+   * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+   *          recovery.
+   * @see ApplicationSubmissionContext
+   *          #setKeepContainersAcrossApplicationAttempts(boolean)
+   * @return UAM token
+   * @throws YarnException if fails
+   * @throws IOException if fails
    */
-  public RegisterApplicationMasterResponse createAndRegisterNewUAM(String 
uamId,
-      RegisterApplicationMasterRequest registerRequest, Configuration conf,
+  public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
+      ApplicationId appId, String queueName, String submitter,
+      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts)
+      throws YarnException, IOException {
+
+    if (this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " already exists");
+    }
+    UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
+        submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
+    // Put the UAM into map first before initializing it to avoid additional 
UAM
+    // for the same uamId being created concurrently
+    this.unmanagedAppMasterMap.put(uamId, uam);
+
+    Token<AMRMTokenIdentifier> amrmToken = null;
+    try {
+      LOG.info("Launching UAM id {} for application {}", uamId, appId);
+      amrmToken = uam.launchUAM();
+    } catch (Exception e) {
+      // Add the map earlier and remove here if register failed because we want
+      // to make sure there is only one uam instance per uamId at any given 
time
+      this.unmanagedAppMasterMap.remove(uamId);
+      throw e;
+    }
+
+    this.appIdMap.put(uamId, uam.getAppId());
+    return amrmToken;
+  }
+
+  /**
+   * Re-attach to an existing UAM, using the provided uamIdentifier.
+   *
+   * @param uamId uam Id
+   * @param conf configuration for this UAM
+   * @param appId application id for the UAM
+   * @param queueName queue of the application
+   * @param submitter submitter name of the UAM
+   * @param appNameSuffix application name suffix for the UAM
+   * @param uamToken UAM token
+   * @throws YarnException if fails
+   * @throws IOException if fails
+   */
+  public void reAttachUAM(String uamId, Configuration conf,
       ApplicationId appId, String queueName, String submitter,
-      String appNameSuffix) throws YarnException, IOException {
+      String appNameSuffix, Token<AMRMTokenIdentifier> uamToken)
+      throws YarnException, IOException {
 
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
     UnmanagedApplicationManager uam =
-        createUAM(conf, appId, queueName, submitter, appNameSuffix);
+        createUAM(conf, appId, queueName, submitter, appNameSuffix, true);
     // Put the UAM into map first before initializing it to avoid additional 
UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
 
-    RegisterApplicationMasterResponse response = null;
     try {
-      LOG.info("Creating and registering UAM id {} for application {}", uamId,
-          appId);
-      response = uam.createAndRegisterApplicationMaster(registerRequest);
+      LOG.info("Reattaching UAM id {} for application {}", uamId, appId);
+      uam.reAttachUAM(uamToken);
     } catch (Exception e) {
       // Add the map earlier and remove here if register failed because we want
       // to make sure there is only one uam instance per uamId at any given 
time
@@ -219,8 +273,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
       throw e;
     }
 
-    this.attemptIdMap.put(uamId, uam.getAttemptId());
-    return response;
+    this.appIdMap.put(uamId, uam.getAppId());
   }
 
   /**
@@ -231,20 +284,42 @@ public class UnmanagedAMPoolManager extends 
AbstractService {
    * @param queueName queue of the application
    * @param submitter submitter name of the application
    * @param appNameSuffix application name suffix
+   * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    * @return the UAM instance
    */
   @VisibleForTesting
   protected UnmanagedApplicationManager createUAM(Configuration conf,
       ApplicationId appId, String queueName, String submitter,
-      String appNameSuffix) {
+      String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
     return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
-        appNameSuffix);
+        appNameSuffix, keepContainersAcrossApplicationAttempts);
+  }
+
+  /**
+   * Register application master for the UAM.
+   *
+   * @param uamId uam Id
+   * @param registerRequest RegisterApplicationMasterRequest
+   * @return register response
+   * @throws YarnException if register fails
+   * @throws IOException if register fails
+   */
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String uamId, RegisterApplicationMasterRequest registerRequest)
+      throws YarnException, IOException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    LOG.info("Registering UAM id {} for application {}", uamId,
+        this.appIdMap.get(uamId));
+    return this.unmanagedAppMasterMap.get(uamId)
+        .registerApplicationMaster(registerRequest);
   }
 
   /**
    * AllocateAsync to an UAM.
    *
-   * @param uamId identifier for the UAM
+   * @param uamId uam Id
    * @param request AllocateRequest
    * @param callback callback for response
    * @throws YarnException if allocate fails
@@ -262,7 +337,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
   /**
    * Finish an UAM/application.
    *
-   * @param uamId identifier for the UAM
+   * @param uamId uam Id
    * @param request FinishApplicationMasterRequest
    * @return FinishApplicationMasterResponse
    * @throws YarnException if finishApplicationMaster call fails
@@ -274,14 +349,15 @@ public class UnmanagedAMPoolManager extends 
AbstractService {
     if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " does not exist");
     }
-    LOG.info("Finishing application for UAM id {} ", uamId);
+    LOG.info("Finishing UAM id {} for application {}", uamId,
+        this.appIdMap.get(uamId));
     FinishApplicationMasterResponse response =
         this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request);
 
     if (response.getIsUnregistered()) {
       // Only remove the UAM when the unregister finished
       this.unmanagedAppMasterMap.remove(uamId);
-      this.attemptIdMap.remove(uamId);
+      this.appIdMap.remove(uamId);
       LOG.info("UAM id {} is unregistered", uamId);
     }
     return response;
@@ -301,7 +377,7 @@ public class UnmanagedAMPoolManager extends AbstractService 
{
   /**
    * Return whether an UAM exists.
    *
-   * @param uamId identifier for the UAM
+   * @param uamId uam Id
    * @return UAM exists or not
    */
   public boolean hasUAMId(String uamId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 6531a75..3f4a110 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -50,7 +50,9 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -90,7 +92,6 @@ public class UnmanagedApplicationManager {
   private AMRequestHandlerThread handlerThread;
   private ApplicationMasterProtocol rmProxy;
   private ApplicationId applicationId;
-  private ApplicationAttemptId attemptId;
   private String submitter;
   private String appNameSuffix;
   private Configuration conf;
@@ -101,9 +102,31 @@ public class UnmanagedApplicationManager {
   private ApplicationClientProtocol rmClient;
   private long asyncApiPollIntervalMillis;
   private RecordFactory recordFactory;
+  private boolean keepContainersAcrossApplicationAttempts;
 
+  /*
+   * This flag is used as an indication that this method launchUAM/reAttachUAM
+   * is called (and perhaps blocked in initializeUnmanagedAM below due to RM
+   * connection/failover issue and not finished yet). Set the flag before
+   * calling the blocking call to RM.
+   */
+  private boolean connectionInitiated;
+
+  /**
+   * Constructor.
+   *
+   * @param conf configuration
+   * @param appId application Id to use for this UAM
+   * @param queueName the queue of the UAM
+   * @param submitter user name of the app
+   * @param appNameSuffix the app name suffix to use
+   * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
+   *          recovery. See {@link ApplicationSubmissionContext
+   *          #setKeepContainersAcrossApplicationAttempts(boolean)}
+   */
   public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
-      String queueName, String submitter, String appNameSuffix) {
+      String queueName, String submitter, String appNameSuffix,
+      boolean keepContainersAcrossApplicationAttempts) {
     Preconditions.checkNotNull(conf, "Configuration cannot be null");
     Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
     Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@@ -116,6 +139,7 @@ public class UnmanagedApplicationManager {
     this.handlerThread = new AMRequestHandlerThread();
     this.requestQueue = new LinkedBlockingQueue<>();
     this.rmProxy = null;
+    this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     this.asyncApiPollIntervalMillis = conf.getLong(
@@ -123,45 +147,84 @@ public class UnmanagedApplicationManager {
             YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
         YarnConfiguration.
             DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
+    this.keepContainersAcrossApplicationAttempts =
+        keepContainersAcrossApplicationAttempts;
+  }
+
+  /**
+   * Launch a new UAM in the resource manager.
+   *
+   * @return identifier uam identifier
+   * @throws YarnException if fails
+   * @throws IOException if fails
+   */
+  public Token<AMRMTokenIdentifier> launchUAM()
+      throws YarnException, IOException {
+    this.connectionInitiated = true;
+
+    // Blocking call to RM
+    Token<AMRMTokenIdentifier> amrmToken =
+        initializeUnmanagedAM(this.applicationId);
+
+    // Creates the UAM connection
+    createUAMProxy(amrmToken);
+    return amrmToken;
+  }
+
+  /**
+   * Re-attach to an existing UAM in the resource manager.
+   *
+   * @param amrmToken the UAM token
+   * @throws IOException if re-attach fails
+   * @throws YarnException if re-attach fails
+   */
+  public void reAttachUAM(Token<AMRMTokenIdentifier> amrmToken)
+      throws IOException, YarnException {
+    this.connectionInitiated = true;
+
+    // Creates the UAM connection
+    createUAMProxy(amrmToken);
+  }
+
+  protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken)
+      throws IOException {
+    this.userUgi = UserGroupInformation.createProxyUser(
+        this.applicationId.toString(), UserGroupInformation.getCurrentUser());
+    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
+        this.userUgi, amrmToken);
   }
 
   /**
    * Registers this {@link UnmanagedApplicationManager} with the resource
    * manager.
    *
-   * @param request the register request
-   * @return the register response
+   * @param request RegisterApplicationMasterRequest
+   * @return register response
    * @throws YarnException if register fails
    * @throws IOException if register fails
    */
-  public RegisterApplicationMasterResponse createAndRegisterApplicationMaster(
+  public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
       throws YarnException, IOException {
-    // This need to be done first in this method, because it is used as an
-    // indication that this method is called (and perhaps blocked due to RM
-    // connection and not finished yet)
+    // Save the register request for re-register later
     this.registerRequest = request;
 
-    // attemptId will be available after this call
-    UnmanagedAMIdentifier identifier =
-        initializeUnmanagedAM(this.applicationId);
-
-    try {
-      this.userUgi = UserGroupInformation.createProxyUser(
-          identifier.getAttemptId().toString(),
-          UserGroupInformation.getCurrentUser());
-    } catch (IOException e) {
-      LOG.error("Exception while trying to get current user", e);
-      throw new YarnRuntimeException(e);
-    }
-
-    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
-        this.userUgi, identifier.getToken());
-
-    LOG.info("Registering the Unmanaged application master {}", 
this.attemptId);
+    // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
+    // We do not expect application already registered exception here
+    LOG.info("Registering the Unmanaged application master {}",
+        this.applicationId);
     RegisterApplicationMasterResponse response =
         this.rmProxy.registerApplicationMaster(this.registerRequest);
 
+    for (Container container : response.getContainersFromPreviousAttempts()) {
+      LOG.info("RegisterUAM returned existing running container "
+          + container.getId());
+    }
+    for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
+      LOG.info("RegisterUAM returned existing NM token for node "
+          + nmToken.getNodeId());
+    }
+
     // Only when register succeed that we start the heartbeat thread
     this.handlerThread.setUncaughtExceptionHandler(
         new HeartBeatThreadUncaughtExceptionHandler());
@@ -187,11 +250,11 @@ public class UnmanagedApplicationManager {
     this.handlerThread.shutdown();
 
     if (this.rmProxy == null) {
-      if (this.registerRequest != null) {
-        // This is possible if the async registerApplicationMaster is still
+      if (this.connectionInitiated) {
+        // This is possible if the async launchUAM is still
         // blocked and retrying. Return a dummy response in this case.
         LOG.warn("Unmanaged AM still not successfully launched/registered yet."
-            + " Stopping the UAM client thread anyways.");
+            + " Stopping the UAM heartbeat thread anyways.");
         return FinishApplicationMasterResponse.newInstance(false);
       } else {
         throw new YarnException("finishApplicationMaster should not "
@@ -199,7 +262,7 @@ public class UnmanagedApplicationManager {
       }
     }
     return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
-        this.registerRequest, this.attemptId);
+        this.registerRequest, this.applicationId);
   }
 
   /**
@@ -212,7 +275,7 @@ public class UnmanagedApplicationManager {
   public KillApplicationResponse forceKillApplication()
       throws IOException, YarnException {
     KillApplicationRequest request =
-        KillApplicationRequest.newInstance(this.attemptId.getApplicationId());
+        KillApplicationRequest.newInstance(this.applicationId);
 
     this.handlerThread.shutdown();
 
@@ -240,29 +303,29 @@ public class UnmanagedApplicationManager {
       LOG.debug("Interrupted while waiting to put on response queue", ex);
     }
     // Two possible cases why the UAM is not successfully registered yet:
-    // 1. registerApplicationMaster is not called at all. Should throw here.
-    // 2. registerApplicationMaster is called but hasn't successfully returned.
+    // 1. launchUAM is not called at all. Should throw here.
+    // 2. launchUAM is called but hasn't successfully returned.
     //
     // In case 2, we have already save the allocate request above, so if the
     // registration succeed later, no request is lost.
     if (this.rmProxy == null) {
-      if (this.registerRequest != null) {
+      if (this.connectionInitiated) {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");
       } else {
         throw new YarnException(
-            "AllocateAsync should not be called before createAndRegister");
+            "AllocateAsync should not be called before launchUAM");
       }
     }
   }
 
   /**
-   * Returns the application attempt id of the UAM.
+   * Returns the application id of the UAM.
    *
-   * @return attempt id of the UAM
+   * @return application id of the UAM
    */
-  public ApplicationAttemptId getAttemptId() {
-    return this.attemptId;
+  public ApplicationId getAppId() {
+    return this.applicationId;
   }
 
   /**
@@ -287,15 +350,15 @@ public class UnmanagedApplicationManager {
    * Launch and initialize an unmanaged AM. First, it creates a new application
    * on the RM and negotiates a new attempt id. Then it waits for the RM
    * application attempt state to reach YarnApplicationAttemptState.LAUNCHED
-   * after which it returns the AM-RM token and the attemptId.
+   * after which it returns the AM-RM token.
    *
    * @param appId application id
-   * @return the UAM identifier
+   * @return the UAM token
    * @throws IOException if initialize fails
    * @throws YarnException if initialize fails
    */
-  protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId)
-      throws IOException, YarnException {
+  protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
+      ApplicationId appId) throws IOException, YarnException {
     try {
       UserGroupInformation appSubmitter =
           UserGroupInformation.createRemoteUser(this.submitter);
@@ -306,13 +369,12 @@ public class UnmanagedApplicationManager {
       submitUnmanagedApp(appId);
 
       // Monitor the application attempt to wait for launch state
-      ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId,
+      monitorCurrentAppAttempt(appId,
           EnumSet.of(YarnApplicationState.ACCEPTED,
               YarnApplicationState.RUNNING, YarnApplicationState.KILLED,
               YarnApplicationState.FAILED, YarnApplicationState.FINISHED),
           YarnApplicationAttemptState.LAUNCHED);
-      this.attemptId = attemptReport.getApplicationAttemptId();
-      return getUAMIdentifier();
+      return getUAMToken();
     } finally {
       this.rmClient = null;
     }
@@ -343,6 +405,8 @@ public class UnmanagedApplicationManager {
     submitRequest.setApplicationSubmissionContext(context);
 
     context.setUnmanagedAM(true);
+    context.setKeepContainersAcrossApplicationAttempts(
+        this.keepContainersAcrossApplicationAttempts);
 
     LOG.info("Submitting unmanaged application {}", appId);
     this.rmClient.submitApplication(submitRequest);
@@ -374,8 +438,10 @@ public class UnmanagedApplicationManager {
         if (appStates.contains(state)) {
           if (state != YarnApplicationState.ACCEPTED) {
             throw new YarnRuntimeException(
-                "Received non-accepted application state: " + state
-                    + ". Application " + appId + " not the first attempt?");
+                "Received non-accepted application state: " + state + " for "
+                    + appId + ". This is likely because this is not the first "
+                    + "app attempt in home sub-cluster, and AMRMProxy HA "
+                    + "(yarn.nodemanager.amrmproxy.ha.enable) is not 
enabled.");
           }
           appAttemptId =
               getApplicationReport(appId).getCurrentApplicationAttemptId();
@@ -415,25 +481,25 @@ public class UnmanagedApplicationManager {
   }
 
   /**
-   * Gets the identifier of the unmanaged AM.
+   * Gets the amrmToken of the unmanaged AM.
    *
-   * @return the identifier of the unmanaged AM.
+   * @return the amrmToken of the unmanaged AM.
    * @throws IOException if getApplicationReport fails
    * @throws YarnException if getApplicationReport fails
    */
-  protected UnmanagedAMIdentifier getUAMIdentifier()
+  protected Token<AMRMTokenIdentifier> getUAMToken()
       throws IOException, YarnException {
     Token<AMRMTokenIdentifier> token = null;
     org.apache.hadoop.yarn.api.records.Token amrmToken =
-        getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken();
+        getApplicationReport(this.applicationId).getAMRMToken();
     if (amrmToken != null) {
       token = ConverterUtils.convertFromYarn(amrmToken, (Text) null);
     } else {
       LOG.warn(
           "AMRMToken not found in the application report for application: {}",
-          this.attemptId.getApplicationId());
+          this.applicationId);
     }
-    return new UnmanagedAMIdentifier(this.attemptId, token);
+    return token;
   }
 
   private ApplicationReport getApplicationReport(ApplicationId appId)
@@ -445,29 +511,6 @@ public class UnmanagedApplicationManager {
   }
 
   /**
-   * Data structure that encapsulates the application attempt identifier and 
the
-   * AMRMTokenIdentifier. Make it public because clients with HA need it.
-   */
-  public static class UnmanagedAMIdentifier {
-    private ApplicationAttemptId attemptId;
-    private Token<AMRMTokenIdentifier> token;
-
-    public UnmanagedAMIdentifier(ApplicationAttemptId attemptId,
-        Token<AMRMTokenIdentifier> token) {
-      this.attemptId = attemptId;
-      this.token = token;
-    }
-
-    public ApplicationAttemptId getAttemptId() {
-      return this.attemptId;
-    }
-
-    public Token<AMRMTokenIdentifier> getToken() {
-      return this.token;
-    }
-  }
-
-  /**
    * Data structure that encapsulates AllocateRequest and AsyncCallback
    * instance.
    */
@@ -549,8 +592,10 @@ public class UnmanagedApplicationManager {
           }
 
           request.setResponseId(lastResponseId);
+
           AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
-              request, rmProxy, registerRequest, attemptId);
+              request, rmProxy, registerRequest, applicationId);
+
           if (response == null) {
             throw new YarnException("Null allocateResponse from allocate");
           }
@@ -578,18 +623,17 @@ public class UnmanagedApplicationManager {
             LOG.debug("Interrupted while waiting for queue", ex);
           }
         } catch (IOException ex) {
-          LOG.warn(
-              "IO Error occurred while processing heart beat for " + attemptId,
-              ex);
+          LOG.warn("IO Error occurred while processing heart beat for "
+              + applicationId, ex);
         } catch (Throwable ex) {
           LOG.warn(
-              "Error occurred while processing heart beat for " + attemptId,
+              "Error occurred while processing heart beat for " + 
applicationId,
               ex);
         }
       }
 
       LOG.info("UnmanagedApplicationManager has been stopped for {}. "
-          + "AMRequestHandlerThread thread is exiting", attemptId);
+          + "AMRequestHandlerThread thread is exiting", applicationId);
     }
   }
 
@@ -600,8 +644,8 @@ public class UnmanagedApplicationManager {
       implements UncaughtExceptionHandler {
     @Override
     public void uncaughtException(Thread t, Throwable e) {
-      LOG.error("Heartbeat thread {} for application attempt {} crashed!",
-          t.getName(), attemptId, e);
+      LOG.error("Heartbeat thread {} for application {} crashed!",
+          t.getName(), applicationId, e);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
index 9f15d90..e1f08e3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -63,16 +63,16 @@ public final class AMRMClientUtils {
   /**
    * Handle ApplicationNotRegistered exception and re-register.
    *
-   * @param attemptId app attemptId
+   * @param appId application Id
    * @param rmProxy RM proxy instance
    * @param registerRequest the AM re-register request
    * @throws YarnException if re-register fails
    */
   public static void handleNotRegisteredExceptionAndReRegister(
-      ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy,
+      ApplicationId appId, ApplicationMasterProtocol rmProxy,
       RegisterApplicationMasterRequest registerRequest) throws YarnException {
     LOG.info("App attempt {} not registered, most likely due to RM failover. "
-        + " Trying to re-register.", attemptId);
+        + " Trying to re-register.", appId);
     try {
       rmProxy.registerApplicationMaster(registerRequest);
     } catch (Exception e) {
@@ -93,25 +93,24 @@ public final class AMRMClientUtils {
    * @param request allocate request
    * @param rmProxy RM proxy
    * @param registerRequest the register request for re-register
-   * @param attemptId application attempt id
+   * @param appId application id
    * @return allocate response
    * @throws YarnException if RM call fails
    * @throws IOException if RM call fails
    */
   public static AllocateResponse allocateWithReRegister(AllocateRequest 
request,
       ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest,
-      ApplicationAttemptId attemptId) throws YarnException, IOException {
+      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+      throws YarnException, IOException {
     try {
       return rmProxy.allocate(request);
     } catch (ApplicationMasterNotRegisteredException e) {
-      handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
           registerRequest);
       // reset responseId after re-register
       request.setResponseId(0);
       // retry allocate
-      return allocateWithReRegister(request, rmProxy, registerRequest,
-          attemptId);
+      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
     }
   }
 
@@ -123,23 +122,22 @@ public final class AMRMClientUtils {
    * @param request finishApplicationMaster request
    * @param rmProxy RM proxy
    * @param registerRequest the register request for re-register
-   * @param attemptId application attempt id
+   * @param appId application id
    * @return finishApplicationMaster response
    * @throws YarnException if RM call fails
    * @throws IOException if RM call fails
    */
   public static FinishApplicationMasterResponse finishAMWithReRegister(
       FinishApplicationMasterRequest request, ApplicationMasterProtocol 
rmProxy,
-      RegisterApplicationMasterRequest registerRequest,
-      ApplicationAttemptId attemptId) throws YarnException, IOException {
+      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
+      throws YarnException, IOException {
     try {
       return rmProxy.finishApplicationMaster(request);
     } catch (ApplicationMasterNotRegisteredException ex) {
-      handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy,
+      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
           registerRequest);
       // retry finishAM after re-register
-      return finishAMWithReRegister(request, rmProxy, registerRequest,
-          attemptId);
+      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index c49d6e8..c509994 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -171,10 +172,9 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
       LoggerFactory.getLogger(MockResourceManagerFacade.class);
 
   private HashSet<ApplicationId> applicationMap = new HashSet<>();
-  private HashMap<String, List<ContainerId>> applicationContainerIdMap =
-      new HashMap<String, List<ContainerId>>();
-  private HashMap<ContainerId, Container> allocatedContainerMap =
-      new HashMap<ContainerId, Container>();
+  private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>();
+  private HashMap<ApplicationAttemptId, List<ContainerId>>
+      applicationContainerIdMap = new HashMap<>();
   private AtomicInteger containerIndex = new AtomicInteger(0);
   private Configuration conf;
   private int subClusterId;
@@ -215,7 +215,7 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
     this.isRunning = mode;
   }
 
-  private static String getAppIdentifier() throws IOException {
+  private static ApplicationAttemptId getAppIdentifier() throws IOException {
     AMRMTokenIdentifier result = null;
     UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
     Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
@@ -225,7 +225,8 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
         break;
       }
     }
-    return result != null ? result.getApplicationAttemptId().toString() : "";
+    return result != null ? result.getApplicationAttemptId()
+        : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
   }
 
   private void validateRunning() throws ConnectException {
@@ -240,19 +241,32 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
       throws YarnException, IOException {
 
     validateRunning();
-
-    String amrmToken = getAppIdentifier();
-    LOG.info("Registering application attempt: " + amrmToken);
+    ApplicationAttemptId attemptId = getAppIdentifier();
+    LOG.info("Registering application attempt: " + attemptId);
 
     shouldReRegisterNext = false;
 
+    List<Container> containersFromPreviousAttempt = null;
+
     synchronized (applicationContainerIdMap) {
-      if (applicationContainerIdMap.containsKey(amrmToken)) {
-        throw new InvalidApplicationMasterRequestException(
-            AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+      if (applicationContainerIdMap.containsKey(attemptId)) {
+        if (keepContainerOnUams.contains(attemptId.getApplicationId())) {
+          // For UAM with the keepContainersFromPreviousAttempt flag, return 
all
+          // running containers
+          containersFromPreviousAttempt = new ArrayList<>();
+          for (ContainerId containerId : applicationContainerIdMap
+              .get(attemptId)) {
+            
containersFromPreviousAttempt.add(Container.newInstance(containerId,
+                null, null, null, null, null));
+          }
+        } else {
+          throw new InvalidApplicationMasterRequestException(
+              AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+        }
+      } else {
+        // Keep track of the containers that are returned to this application
+        applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>());
       }
-      // Keep track of the containers that are returned to this application
-      applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
     }
 
     // Make sure we wait for certain test cases last in the method
@@ -272,7 +286,7 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
     }
 
     return RegisterApplicationMasterResponse.newInstance(null, null, null, 
null,
-        null, request.getHost(), null);
+        containersFromPreviousAttempt, request.getHost(), null);
   }
 
   @Override
@@ -282,8 +296,8 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
 
     validateRunning();
 
-    String amrmToken = getAppIdentifier();
-    LOG.info("Finishing application attempt: " + amrmToken);
+    ApplicationAttemptId attemptId = getAppIdentifier();
+    LOG.info("Finishing application attempt: " + attemptId);
 
     if (shouldReRegisterNext) {
       String message = "AM is not registered, should re-register.";
@@ -293,12 +307,9 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
 
     synchronized (applicationContainerIdMap) {
       // Remove the containers that were being tracked for this application
-      Assert.assertTrue("The application id is NOT registered: " + amrmToken,
-          applicationContainerIdMap.containsKey(amrmToken));
-      List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
-      for (ContainerId c : ids) {
-        allocatedContainerMap.remove(c);
-      }
+      Assert.assertTrue("The application id is NOT registered: " + attemptId,
+          applicationContainerIdMap.containsKey(attemptId));
+      applicationContainerIdMap.remove(attemptId);
     }
 
     return FinishApplicationMasterResponse.newInstance(
@@ -328,8 +339,8 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
           + "askList and releaseList in the same heartbeat");
     }
 
-    String amrmToken = getAppIdentifier();
-    LOG.info("Allocate from application attempt: " + amrmToken);
+    ApplicationAttemptId attemptId = getAppIdentifier();
+    LOG.info("Allocate from application attempt: " + attemptId);
 
     if (shouldReRegisterNext) {
       String message = "AM is not registered, should re-register.";
@@ -361,16 +372,16 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
             // will need it in future
             Assert.assertTrue(
                 "The application id is Not registered before allocate(): "
-                    + amrmToken,
-                applicationContainerIdMap.containsKey(amrmToken));
-            List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+                    + attemptId,
+                applicationContainerIdMap.containsKey(attemptId));
+            List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
             ids.add(containerId);
-            this.allocatedContainerMap.put(containerId, container);
           }
         }
       }
     }
 
+    List<ContainerStatus> completedList = new ArrayList<>();
     if (request.getReleaseList() != null
         && request.getReleaseList().size() > 0) {
       LOG.info("Releasing containers: " + request.getReleaseList().size());
@@ -378,9 +389,9 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
         Assert
             .assertTrue(
                 "The application id is not registered before allocate(): "
-                    + amrmToken,
-                applicationContainerIdMap.containsKey(amrmToken));
-        List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+                    + attemptId,
+                applicationContainerIdMap.containsKey(attemptId));
+        List<ContainerId> ids = applicationContainerIdMap.get(attemptId);
 
         for (ContainerId id : request.getReleaseList()) {
           boolean found = false;
@@ -396,18 +407,8 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
               + conf.get("AMRMTOKEN"), found);
 
           ids.remove(id);
-
-          // Return the released container back to the AM with new fake Ids. 
The
-          // test case does not care about the IDs. The IDs are faked because
-          // otherwise the LRM will throw duplication identifier exception. 
This
-          // returning of fake containers is ONLY done for testing purpose - 
for
-          // the test code to get confirmation that the sub-cluster resource
-          // managers received the release request
-          ContainerId fakeContainerId = ContainerId.newInstance(
-              getApplicationAttemptId(1), containerIndex.incrementAndGet());
-          Container fakeContainer = allocatedContainerMap.get(id);
-          fakeContainer.setId(fakeContainerId);
-          containerList.add(fakeContainer);
+          completedList.add(
+              ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0));
         }
       }
     }
@@ -418,8 +419,7 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
     // Always issue a new AMRMToken as if RM rolled master key
     Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
 
-    return AllocateResponse.newInstance(0,
-        new ArrayList<ContainerStatus>(), containerList,
+    return AllocateResponse.newInstance(0, completedList, containerList,
         new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
         new ArrayList<NMToken>(), newAMRMToken,
         new ArrayList<UpdatedContainer>(), null);
@@ -438,6 +438,7 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
     report.setApplicationId(request.getApplicationId());
     report.setCurrentApplicationAttemptId(
         ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
+    report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], ""));
     response.setApplicationReport(report);
     return response;
   }
@@ -481,6 +482,12 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
     }
     LOG.info("Application submitted: " + appId);
     applicationMap.add(appId);
+
+    if (request.getApplicationSubmissionContext().getUnmanagedAM()
+        || request.getApplicationSubmissionContext()
+            .getKeepContainersAcrossApplicationAttempts()) {
+      keepContainerOnUams.add(appId);
+    }
     return SubmitApplicationResponse.newInstance();
   }
 
@@ -497,6 +504,7 @@ public class MockResourceManagerFacade implements 
ApplicationClientProtocol,
         throw new ApplicationNotFoundException(
             "Trying to kill an absent application: " + appId);
       }
+      keepContainerOnUams.remove(appId);
     }
     LOG.info("Force killing application: " + appId);
     return KillApplicationResponse.newInstance(true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
new file mode 100644
index 0000000..42be851
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for FederationRegistryClient.
+ */
+public class TestFederationRegistryClient {
+  private Configuration conf;
+  private UserGroupInformation user;
+  private RegistryOperations registry;
+  private FederationRegistryClient registryClient;
+
+  @Before
+  public void setup() throws Exception {
+    this.conf = new YarnConfiguration();
+
+    this.registry = new FSRegistryOperationsService();
+    this.registry.init(this.conf);
+    this.registry.start();
+
+    this.user = UserGroupInformation.getCurrentUser();
+    this.registryClient =
+        new FederationRegistryClient(this.conf, this.registry, this.user);
+    this.registryClient.cleanAllApplications();
+    Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+  }
+
+  @After
+  public void breakDown() {
+    registryClient.cleanAllApplications();
+    Assert.assertEquals(0, registryClient.getAllApplications().size());
+    registry.stop();
+  }
+
+  @Test
+  public void testBasicCase() {
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    String scId1 = "subcluster1";
+    String scId2 = "subcluster2";
+
+    this.registryClient.writeAMRMTokenForUAM(appId, scId1,
+        new Token<AMRMTokenIdentifier>());
+    this.registryClient.writeAMRMTokenForUAM(appId, scId2,
+        new Token<AMRMTokenIdentifier>());
+    // Duplicate entry, should overwrite
+    this.registryClient.writeAMRMTokenForUAM(appId, scId1,
+        new Token<AMRMTokenIdentifier>());
+
+    Assert.assertEquals(1, this.registryClient.getAllApplications().size());
+    Assert.assertEquals(2,
+        this.registryClient.loadStateFromRegistry(appId).size());
+
+    this.registryClient.removeAppFromRegistry(appId);
+
+    Assert.assertEquals(0, this.registryClient.getAllApplications().size());
+    Assert.assertEquals(0,
+        this.registryClient.loadStateFromRegistry(appId).size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index 9159cf7..5848d3f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager {
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
 
     uam = new TestableUnmanagedApplicationManager(conf,
-        attemptId.getApplicationId(), null, "submitter", "appNameSuffix");
+        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", 
true);
   }
 
   protected void waitForCallBackCountAndCheckZeroPending(
@@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager {
   public void testBasicUsage()
       throws YarnException, IOException, InterruptedException {
 
-    createAndRegisterApplicationMaster(
+    launchUAM(attemptId);
+    registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance(null, 0, null), 
attemptId);
 
     allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), 
callback,
@@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager {
         attemptId);
   }
 
+  /*
+   * Test re-attaching of an existing UAM. This is for HA of UAM client.
+   */
+  @Test(timeout = 5000)
+  public void testUAMReAttach()
+      throws YarnException, IOException, InterruptedException {
+
+    launchUAM(attemptId);
+    registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), 
attemptId);
+
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), 
callback,
+        attemptId);
+    // Wait for outstanding async allocate callback
+    waitForCallBackCountAndCheckZeroPending(callback, 1);
+
+    MockResourceManagerFacade rmProxy = uam.getRMProxy();
+    uam = new TestableUnmanagedApplicationManager(conf,
+        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", 
true);
+    uam.setRMProxy(rmProxy);
+
+    reAttachUAM(null, attemptId);
+    registerApplicationMaster(
+        RegisterApplicationMasterRequest.newInstance(null, 0, null), 
attemptId);
+
+    allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), 
callback,
+        attemptId);
+
+    // Wait for outstanding async allocate callback
+    waitForCallBackCountAndCheckZeroPending(callback, 2);
+
+    finishApplicationMaster(
+        FinishApplicationMasterRequest.newInstance(null, null, null),
+        attemptId);
+  }
+
   @Test(timeout = 5000)
   public void testReRegister()
       throws YarnException, IOException, InterruptedException {
 
-    createAndRegisterApplicationMaster(
+    launchUAM(attemptId);
+    registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance(null, 0, null), 
attemptId);
 
     uam.setShouldReRegisterNext();
@@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager {
       @Override
       public void run() {
         try {
-          createAndRegisterApplicationMaster(
+          launchUAM(attemptId);
+          registerApplicationMaster(
               RegisterApplicationMasterRequest.newInstance(null, 1001, null),
               attemptId);
         } catch (Exception e) {
@@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager {
   @Test
   public void testForceKill()
       throws YarnException, IOException, InterruptedException {
-    createAndRegisterApplicationMaster(
+    launchUAM(attemptId);
+    registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance(null, 0, null), 
attemptId);
     uam.forceKillApplication();
 
@@ -241,19 +281,40 @@ public class TestUnmanagedApplicationManager {
     return ugi;
   }
 
-  protected RegisterApplicationMasterResponse
-      createAndRegisterApplicationMaster(
-          final RegisterApplicationMasterRequest request,
-          ApplicationAttemptId appAttemptId)
-          throws YarnException, IOException, InterruptedException {
+  protected Token<AMRMTokenIdentifier> launchUAM(
+      ApplicationAttemptId appAttemptId)
+      throws IOException, InterruptedException {
+    return getUGIWithToken(appAttemptId)
+        .doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() {
+          @Override
+          public Token<AMRMTokenIdentifier> run() throws Exception {
+            return uam.launchUAM();
+          }
+        });
+  }
+
+  protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken,
+      ApplicationAttemptId appAttemptId)
+      throws IOException, InterruptedException {
+    getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() 
{
+      @Override
+      public Token<AMRMTokenIdentifier> run() throws Exception {
+        uam.reAttachUAM(uamToken);
+        return null;
+      }
+    });
+  }
+
+  protected RegisterApplicationMasterResponse registerApplicationMaster(
+      final RegisterApplicationMasterRequest request,
+      ApplicationAttemptId appAttemptId)
+      throws YarnException, IOException, InterruptedException {
     return getUGIWithToken(appAttemptId).doAs(
         new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
           @Override
           public RegisterApplicationMasterResponse run()
               throws YarnException, IOException {
-            RegisterApplicationMasterResponse response =
-                uam.createAndRegisterApplicationMaster(request);
-            return response;
+            return uam.registerApplicationMaster(request);
           }
         });
   }
@@ -311,8 +372,9 @@ public class TestUnmanagedApplicationManager {
 
     public TestableUnmanagedApplicationManager(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix) {
-      super(conf, appId, queueName, submitter, appNameSuffix);
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) 
{
+      super(conf, appId, queueName, submitter, appNameSuffix,
+          keepContainersAcrossApplicationAttempts);
     }
 
     @SuppressWarnings("unchecked")
@@ -330,6 +392,14 @@ public class TestUnmanagedApplicationManager {
         rmProxy.setShouldReRegisterNext();
       }
     }
+
+    public MockResourceManagerFacade getRMProxy() {
+      return rmProxy;
+    }
+
+    public void setRMProxy(MockResourceManagerFacade proxy) {
+      this.rmProxy = proxy;
+    }
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
index c355a8b..92afcb7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -67,4 +69,18 @@ public interface AMRMProxyApplicationContext {
    */
   Context getNMCotext();
 
+  /**
+   * Gets the credentials of this application.
+   *
+   * @return the credentials.
+   */
+  Credentials getCredentials();
+
+  /**
+   * Gets the registry client.
+   *
+   * @return the registry.
+   */
+  RegistryOperations getRegistryClient();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6db6c98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
index 9938b37..8a02095 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -42,6 +44,8 @@ public class AMRMProxyApplicationContextImpl implements
   private Integer localTokenKeyId;
   private Token<AMRMTokenIdentifier> amrmToken;
   private Token<AMRMTokenIdentifier> localToken;
+  private Credentials credentials;
+  private RegistryOperations registry;
 
   /**
    * Create an instance of the AMRMProxyApplicationContext.
@@ -52,17 +56,23 @@ public class AMRMProxyApplicationContextImpl implements
    * @param user user name of the application
    * @param amrmToken amrmToken issued by RM
    * @param localToken amrmToken issued by AMRMProxy
+   * @param credentials application credentials
+   * @param registry Yarn Registry client
    */
-  public AMRMProxyApplicationContextImpl(Context nmContext,
-      Configuration conf, ApplicationAttemptId applicationAttemptId,
-      String user, Token<AMRMTokenIdentifier> amrmToken,
-      Token<AMRMTokenIdentifier> localToken) {
+  @SuppressWarnings("checkstyle:parameternumber")
+  public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf,
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken, Credentials credentials,
+      RegistryOperations registry) {
     this.nmContext = nmContext;
     this.conf = conf;
     this.applicationAttemptId = applicationAttemptId;
     this.user = user;
     this.amrmToken = amrmToken;
     this.localToken = localToken;
+    this.credentials = credentials;
+    this.registry = registry;
   }
 
   @Override
@@ -88,11 +98,14 @@ public class AMRMProxyApplicationContextImpl implements
   /**
    * Sets the application's AMRMToken.
    *
-   * @param amrmToken amrmToken issued by RM
+   * @param amrmToken the new amrmToken from RM
+   * @return whether the saved token is updated to a different value
    */
-  public synchronized void setAMRMToken(
+  public synchronized boolean setAMRMToken(
       Token<AMRMTokenIdentifier> amrmToken) {
+    Token<AMRMTokenIdentifier> oldValue = this.amrmToken;
     this.amrmToken = amrmToken;
+    return !this.amrmToken.equals(oldValue);
   }
 
   @Override
@@ -134,4 +147,14 @@ public class AMRMProxyApplicationContextImpl implements
   public Context getNMCotext() {
     return nmContext;
   }
+
+  @Override
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+
+  @Override
+  public RegistryOperations getRegistryClient() {
+    return this.registry;
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to