YARN-6128. Add support for AMRMProxy HA. (Botong Huang via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5f66888 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5f66888 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5f66888 Branch: refs/heads/YARN-5881 Commit: d5f66888b8d767ee6706fab9950c194a1bf26d32 Parents: 0940e4f Author: Subru Krishnan <su...@apache.org> Authored: Fri Nov 17 17:39:06 2017 -0800 Committer: Subru Krishnan <su...@apache.org> Committed: Fri Nov 17 17:39:06 2017 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 13 + .../src/main/resources/yarn-default.xml | 21 ++ .../hadoop-yarn-server-common/pom.xml | 5 + .../utils/FederationRegistryClient.java | 338 +++++++++++++++++++ .../yarn/server/uam/UnmanagedAMPoolManager.java | 141 ++++++-- .../server/uam/UnmanagedApplicationManager.java | 212 +++++++----- .../yarn/server/utils/AMRMClientUtils.java | 30 +- .../yarn/server/MockResourceManagerFacade.java | 103 +++--- .../utils/TestFederationRegistryClient.java | 90 +++++ .../uam/TestUnmanagedApplicationManager.java | 100 +++++- .../amrmproxy/AMRMProxyApplicationContext.java | 16 + .../AMRMProxyApplicationContextImpl.java | 35 +- .../nodemanager/amrmproxy/AMRMProxyService.java | 83 ++++- .../amrmproxy/FederationInterceptor.java | 221 +++++++++++- .../containermanager/ContainerManagerImpl.java | 9 +- .../amrmproxy/BaseAMRMProxyTest.java | 14 +- .../amrmproxy/TestAMRMProxyService.java | 21 +- .../amrmproxy/TestFederationInterceptor.java | 126 ++++++- .../TestableFederationInterceptor.java | 29 +- .../hadoop/yarn/server/MiniYARNCluster.java | 6 +- .../src/site/markdown/Federation.md | 11 +- 21 files changed, 1345 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 34257ed..ead9977 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2096,6 +2096,9 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + "DefaultRequestInterceptor"; + public static final String AMRM_PROXY_HA_ENABLED = NM_PREFIX + + "amrmproxy.ha.enable"; + public static final boolean DEFAULT_AMRM_PROXY_HA_ENABLED = false; /** * Default platform-agnostic CLASSPATH for YARN applications. A @@ -2930,6 +2933,11 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + public static final String FEDERATION_REGISTRY_BASE_KEY = + FEDERATION_PREFIX + "registry.base-dir"; + public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = + "yarnfederation/"; + // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; @@ -3087,6 +3095,11 @@ public class YarnConfiguration extends Configuration { // Other Configs //////////////////////////////// + public static final String YARN_REGISTRY_CLASS = + YARN_PREFIX + "registry.class"; + public static final String DEFAULT_YARN_REGISTRY_CLASS = + "org.apache.hadoop.registry.client.impl.FSRegistryOperationsService"; + /** * Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead. * The interval of the yarn client's querying application state after http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e90d0f2..12cb902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2826,7 +2826,20 @@ <value>300</value> </property> + <property> + <description>The registry base directory for federation.</description> + <name>yarn.federation.registry.base-dir</name> + <value>yarnfederation/</value> + </property> + <!-- Other Configuration --> + + <property> + <description>The registry implementation to use.</description> + <name>yarn.registry.class</name> + <value>org.apache.hadoop.registry.client.impl.FSRegistryOperationsService</value> + </property> + <property> <description>The interval that the yarn client library uses to poll the completion status of the asynchronous API of application client protocol. @@ -2989,6 +3002,14 @@ <property> <description> + Whether AMRMProxy HA is enabled. + </description> + <name>yarn.nodemanager.amrmproxy.ha.enable</name> + <value>false</value> + </property> + + <property> + <description> Setting that controls whether distributed scheduling is enabled. </description> <name>yarn.nodemanager.distributed-scheduling.enabled</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 43ae3af..cd5195d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -67,6 +67,11 @@ </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-registry</artifactId> + </dependency> + + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java new file mode 100644 index 0000000..6624318 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.registry.client.api.BindFlags; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Helper class that handles reads and writes to Yarn Registry to support UAM HA + * and second attempt. + */ +public class FederationRegistryClient { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRegistryClient.class); + + private RegistryOperations registry; + + private UserGroupInformation user; + + // AppId -> SubClusterId -> UAM token + private Map<ApplicationId, Map<String, Token<AMRMTokenIdentifier>>> + appSubClusterTokenMap; + + // Structure in registry: <registryBaseDir>/<AppId>/<SubClusterId> -> UAMToken + private String registryBaseDir; + + public FederationRegistryClient(Configuration conf, + RegistryOperations registry, UserGroupInformation user) { + this.registry = registry; + this.user = user; + this.appSubClusterTokenMap = new ConcurrentHashMap<>(); + this.registryBaseDir = + conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY, + YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY); + LOG.info("Using registry {} with base directory: {}", + this.registry.getClass().getName(), this.registryBaseDir); + } + + /** + * Get the list of known applications in the registry. + * + * @return the list of known applications + */ + public List<String> getAllApplications() { + // Suppress the exception here because it is valid that the entry does not + // exist + List<String> applications = null; + try { + applications = listDirRegistry(this.registry, this.user, + getRegistryKey(null, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + if (applications == null) { + // It is valid for listDirRegistry to return null + return new ArrayList<>(); + } + return applications; + } + + /** + * For testing, delete all application records in registry. + */ + @VisibleForTesting + public void cleanAllApplications() { + try { + removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), + true, false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from removeKeyRegistry", e); + } + } + + /** + * Write/update the UAM token for an application and a sub-cluster. + * + * @param subClusterId sub-cluster id of the token + * @param token the UAM of the application + * @return whether the amrmToken is added or updated to a new value + */ + public boolean writeAMRMTokenForUAM(ApplicationId appId, + String subClusterId, Token<AMRMTokenIdentifier> token) { + Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + if (subClusterTokenMap == null) { + subClusterTokenMap = new ConcurrentHashMap<>(); + this.appSubClusterTokenMap.put(appId, subClusterTokenMap); + } + + boolean update = !token.equals(subClusterTokenMap.get(subClusterId)); + if (!update) { + LOG.debug("Same amrmToken received from {}, skip writing registry for {}", + subClusterId, appId); + return update; + } + + LOG.info("Writing/Updating amrmToken for {} to registry for {}", + subClusterId, appId); + try { + // First, write the token entry + writeRegistry(this.registry, this.user, + getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true); + + // Then update the subClusterTokenMap + subClusterTokenMap.put(subClusterId, token); + } catch (YarnException | IOException e) { + LOG.error( + "Failed writing AMRMToken to registry for subcluster " + subClusterId, + e); + } + return update; + } + + /** + * Load the information of one application from registry. + * + * @param appId application id + * @return the sub-cluster to UAM token mapping + */ + public Map<String, Token<AMRMTokenIdentifier>> + loadStateFromRegistry(ApplicationId appId) { + Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>(); + // Suppress the exception here because it is valid that the entry does not + // exist + List<String> subclusters = null; + try { + subclusters = listDirRegistry(this.registry, this.user, + getRegistryKey(appId, null), false); + } catch (YarnException e) { + LOG.warn("Unexpected exception from listDirRegistry", e); + } + + if (subclusters == null) { + LOG.info("Application {} does not exist in registry", appId); + return retMap; + } + + // Read the amrmToken for each sub-cluster with an existing UAM + for (String scId : subclusters) { + LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId); + String key = getRegistryKey(appId, scId); + try { + String tokenString = readRegistry(this.registry, this.user, key, true); + if (tokenString == null) { + throw new YarnException("Null string from readRegistry key " + key); + } + Token<AMRMTokenIdentifier> amrmToken = new Token<>(); + amrmToken.decodeFromUrlString(tokenString); + // Clear the service field, as if RM just issued the token + amrmToken.setService(new Text()); + + retMap.put(scId, amrmToken); + } catch (Exception e) { + LOG.error("Failed reading registry key " + key + + ", skipping subcluster " + scId, e); + } + } + + // Override existing map if there + this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap)); + return retMap; + } + + /** + * Remove an application from registry. + * + * @param appId application id + */ + public void removeAppFromRegistry(ApplicationId appId) { + Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = + this.appSubClusterTokenMap.get(appId); + LOG.info("Removing all registry entries for {}", appId); + + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } + + // Lastly remove the application directory + String key = getRegistryKey(appId, null); + try { + removeKeyRegistry(this.registry, this.user, key, true, true); + subClusterTokenMap.clear(); + } catch (YarnException e) { + LOG.error("Failed removing registry directory key " + key, e); + } + } + + private String getRegistryKey(ApplicationId appId, String fileName) { + if (appId == null) { + return this.registryBaseDir; + } + if (fileName == null) { + return this.registryBaseDir + appId.toString(); + } + return this.registryBaseDir + appId.toString() + "/" + fileName; + } + + private String readRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + // Use the ugi loaded with app credentials to access registry + String result = ugi.doAs(new PrivilegedAction<String>() { + @Override + public String run() { + try { + ServiceRecord value = registryImpl.resolve(key); + if (value != null) { + return value.description; + } + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry resolve key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry resolve key " + key + " failed"); + } + return result; + } + + private void removeKeyRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean recursive, + final boolean throwIfFails) throws YarnException { + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction<Boolean>() { + @Override + public Boolean run() { + try { + registryImpl.delete(key, recursive); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry remove key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry remove key " + key + " failed"); + } + } + + /** + * Write registry entry, override if exists. + */ + private void writeRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final String value, + final boolean throwIfFails) throws YarnException { + + final ServiceRecord recordValue = new ServiceRecord(); + recordValue.description = value; + // Use the ugi loaded with app credentials to access registry + boolean success = ugi.doAs(new PrivilegedAction<Boolean>() { + @Override + public Boolean run() { + try { + registryImpl.bind(key, recordValue, BindFlags.OVERWRITE); + return true; + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry write key " + key + " failed", e); + } + } + return false; + } + }); + if (!success && throwIfFails) { + throw new YarnException("Registry write key " + key + " failed"); + } + } + + /** + * List the sub directories in the given directory. + */ + private List<String> listDirRegistry(final RegistryOperations registryImpl, + UserGroupInformation ugi, final String key, final boolean throwIfFails) + throws YarnException { + List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() { + @Override + public List<String> run() { + try { + return registryImpl.list(key); + } catch (Throwable e) { + if (throwIfFails) { + LOG.error("Registry list key " + key + " failed", e); + } + } + return null; + } + }); + if (result == null && throwIfFails) { + throw new YarnException("Registry list key " + key + " failed"); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 08aee77..677c4e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -44,9 +45,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.utils.AMRMClientUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; @@ -67,7 +69,7 @@ public class UnmanagedAMPoolManager extends AbstractService { // Map from uamId to UAM instances private Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap; - private Map<String, ApplicationAttemptId> attemptIdMap; + private Map<String, ApplicationId> appIdMap; private ExecutorService threadpool; @@ -82,7 +84,7 @@ public class UnmanagedAMPoolManager extends AbstractService { this.threadpool = Executors.newCachedThreadPool(); } this.unmanagedAppMasterMap = new ConcurrentHashMap<>(); - this.attemptIdMap = new ConcurrentHashMap<>(); + this.appIdMap = new ConcurrentHashMap<>(); super.serviceStart(); } @@ -114,7 +116,7 @@ public class UnmanagedAMPoolManager extends AbstractService { public KillApplicationResponse call() throws Exception { try { LOG.info("Force-killing UAM id " + uamId + " for application " - + attemptIdMap.get(uamId)); + + appIdMap.get(uamId)); return unmanagedAppMasterMap.remove(uamId).forceKillApplication(); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); @@ -132,7 +134,7 @@ public class UnmanagedAMPoolManager extends AbstractService { LOG.error("Failed to kill unmanaged application master", e); } } - this.attemptIdMap.clear(); + this.appIdMap.clear(); super.serviceStop(); } @@ -145,13 +147,18 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) * @return uamId for the UAM * @throws YarnException if registerApplicationMaster fails * @throws IOException if registerApplicationMaster fails */ public String createAndRegisterNewUAM( RegisterApplicationMasterRequest registerRequest, Configuration conf, - String queueName, String submitter, String appNameSuffix) + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) throws YarnException, IOException { ApplicationId appId = null; ApplicationClientProtocol rmClient; @@ -173,45 +180,93 @@ public class UnmanagedAMPoolManager extends AbstractService { rmClient = null; } - createAndRegisterNewUAM(appId.toString(), registerRequest, conf, appId, - queueName, submitter, appNameSuffix); + // Launch the UAM in RM + launchUAM(appId.toString(), conf, appId, queueName, submitter, + appNameSuffix, keepContainersAcrossApplicationAttempts); + + // Register the UAM application + registerApplicationMaster(appId.toString(), registerRequest); + + // Returns the appId as uamId return appId.toString(); } /** - * Create a new UAM and register the application, using the provided uamId and - * appId. + * Launch a new UAM, using the provided uamId and appId. * - * @param uamId identifier for the UAM - * @param registerRequest RegisterApplicationMasterRequest + * @param uamId uam Id * @param conf configuration for this UAM * @param appId application id for the UAM * @param queueName queue of the application * @param submitter submitter name of the UAM * @param appNameSuffix application name suffix for the UAM - * @return RegisterApplicationMasterResponse - * @throws YarnException if registerApplicationMaster fails - * @throws IOException if registerApplicationMaster fails + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. + * @see ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean) + * @return UAM token + * @throws YarnException if fails + * @throws IOException if fails */ - public RegisterApplicationMasterResponse createAndRegisterNewUAM(String uamId, - RegisterApplicationMasterRequest registerRequest, Configuration conf, + public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf, + ApplicationId appId, String queueName, String submitter, + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) + throws YarnException, IOException { + + if (this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " already exists"); + } + UnmanagedApplicationManager uam = createUAM(conf, appId, queueName, + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); + // Put the UAM into map first before initializing it to avoid additional UAM + // for the same uamId being created concurrently + this.unmanagedAppMasterMap.put(uamId, uam); + + Token<AMRMTokenIdentifier> amrmToken = null; + try { + LOG.info("Launching UAM id {} for application {}", uamId, appId); + amrmToken = uam.launchUAM(); + } catch (Exception e) { + // Add the map earlier and remove here if register failed because we want + // to make sure there is only one uam instance per uamId at any given time + this.unmanagedAppMasterMap.remove(uamId); + throw e; + } + + this.appIdMap.put(uamId, uam.getAppId()); + return amrmToken; + } + + /** + * Re-attach to an existing UAM, using the provided uamIdentifier. + * + * @param uamId uam Id + * @param conf configuration for this UAM + * @param appId application id for the UAM + * @param queueName queue of the application + * @param submitter submitter name of the UAM + * @param appNameSuffix application name suffix for the UAM + * @param uamToken UAM token + * @throws YarnException if fails + * @throws IOException if fails + */ + public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) throws YarnException, IOException { + String appNameSuffix, Token<AMRMTokenIdentifier> uamToken) + throws YarnException, IOException { if (this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " already exists"); } UnmanagedApplicationManager uam = - createUAM(conf, appId, queueName, submitter, appNameSuffix); + createUAM(conf, appId, queueName, submitter, appNameSuffix, true); // Put the UAM into map first before initializing it to avoid additional UAM // for the same uamId being created concurrently this.unmanagedAppMasterMap.put(uamId, uam); - RegisterApplicationMasterResponse response = null; try { - LOG.info("Creating and registering UAM id {} for application {}", uamId, - appId); - response = uam.createAndRegisterApplicationMaster(registerRequest); + LOG.info("Reattaching UAM id {} for application {}", uamId, appId); + uam.reAttachUAM(uamToken); } catch (Exception e) { // Add the map earlier and remove here if register failed because we want // to make sure there is only one uam instance per uamId at any given time @@ -219,8 +274,7 @@ public class UnmanagedAMPoolManager extends AbstractService { throw e; } - this.attemptIdMap.put(uamId, uam.getAttemptId()); - return response; + this.appIdMap.put(uamId, uam.getAppId()); } /** @@ -231,20 +285,42 @@ public class UnmanagedAMPoolManager extends AbstractService { * @param queueName queue of the application * @param submitter submitter name of the application * @param appNameSuffix application name suffix + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM * @return the UAM instance */ @VisibleForTesting protected UnmanagedApplicationManager createUAM(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { return new UnmanagedApplicationManager(conf, appId, queueName, submitter, - appNameSuffix); + appNameSuffix, keepContainersAcrossApplicationAttempts); + } + + /** + * Register application master for the UAM. + * + * @param uamId uam Id + * @param registerRequest RegisterApplicationMasterRequest + * @return register response + * @throws YarnException if register fails + * @throws IOException if register fails + */ + public RegisterApplicationMasterResponse registerApplicationMaster( + String uamId, RegisterApplicationMasterRequest registerRequest) + throws YarnException, IOException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + LOG.info("Registering UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); + return this.unmanagedAppMasterMap.get(uamId) + .registerApplicationMaster(registerRequest); } /** * AllocateAsync to an UAM. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request AllocateRequest * @param callback callback for response * @throws YarnException if allocate fails @@ -262,7 +338,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Finish an UAM/application. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @param request FinishApplicationMasterRequest * @return FinishApplicationMasterResponse * @throws YarnException if finishApplicationMaster call fails @@ -274,14 +350,15 @@ public class UnmanagedAMPoolManager extends AbstractService { if (!this.unmanagedAppMasterMap.containsKey(uamId)) { throw new YarnException("UAM " + uamId + " does not exist"); } - LOG.info("Finishing application for UAM id {} ", uamId); + LOG.info("Finishing UAM id {} for application {}", uamId, + this.appIdMap.get(uamId)); FinishApplicationMasterResponse response = this.unmanagedAppMasterMap.get(uamId).finishApplicationMaster(request); if (response.getIsUnregistered()) { // Only remove the UAM when the unregister finished this.unmanagedAppMasterMap.remove(uamId); - this.attemptIdMap.remove(uamId); + this.appIdMap.remove(uamId); LOG.info("UAM id {} is unregistered", uamId); } return response; @@ -301,7 +378,7 @@ public class UnmanagedAMPoolManager extends AbstractService { /** * Return whether an UAM exists. * - * @param uamId identifier for the UAM + * @param uamId uam Id * @return UAM exists or not */ public boolean hasUAMId(String uamId) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6531a75..3f4a110 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -50,7 +50,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -90,7 +92,6 @@ public class UnmanagedApplicationManager { private AMRequestHandlerThread handlerThread; private ApplicationMasterProtocol rmProxy; private ApplicationId applicationId; - private ApplicationAttemptId attemptId; private String submitter; private String appNameSuffix; private Configuration conf; @@ -101,9 +102,31 @@ public class UnmanagedApplicationManager { private ApplicationClientProtocol rmClient; private long asyncApiPollIntervalMillis; private RecordFactory recordFactory; + private boolean keepContainersAcrossApplicationAttempts; + /* + * This flag is used as an indication that this method launchUAM/reAttachUAM + * is called (and perhaps blocked in initializeUnmanagedAM below due to RM + * connection/failover issue and not finished yet). Set the flag before + * calling the blocking call to RM. + */ + private boolean connectionInitiated; + + /** + * Constructor. + * + * @param conf configuration + * @param appId application Id to use for this UAM + * @param queueName the queue of the UAM + * @param submitter user name of the app + * @param appNameSuffix the app name suffix to use + * @param keepContainersAcrossApplicationAttempts keep container flag for UAM + * recovery. See {@link ApplicationSubmissionContext + * #setKeepContainersAcrossApplicationAttempts(boolean)} + */ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId, - String queueName, String submitter, String appNameSuffix) { + String queueName, String submitter, String appNameSuffix, + boolean keepContainersAcrossApplicationAttempts) { Preconditions.checkNotNull(conf, "Configuration cannot be null"); Preconditions.checkNotNull(appId, "ApplicationId cannot be null"); Preconditions.checkNotNull(submitter, "App submitter cannot be null"); @@ -116,6 +139,7 @@ public class UnmanagedApplicationManager { this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); this.rmProxy = null; + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.asyncApiPollIntervalMillis = conf.getLong( @@ -123,45 +147,84 @@ public class UnmanagedApplicationManager { YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration. DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); + this.keepContainersAcrossApplicationAttempts = + keepContainersAcrossApplicationAttempts; + } + + /** + * Launch a new UAM in the resource manager. + * + * @return identifier uam identifier + * @throws YarnException if fails + * @throws IOException if fails + */ + public Token<AMRMTokenIdentifier> launchUAM() + throws YarnException, IOException { + this.connectionInitiated = true; + + // Blocking call to RM + Token<AMRMTokenIdentifier> amrmToken = + initializeUnmanagedAM(this.applicationId); + + // Creates the UAM connection + createUAMProxy(amrmToken); + return amrmToken; + } + + /** + * Re-attach to an existing UAM in the resource manager. + * + * @param amrmToken the UAM token + * @throws IOException if re-attach fails + * @throws YarnException if re-attach fails + */ + public void reAttachUAM(Token<AMRMTokenIdentifier> amrmToken) + throws IOException, YarnException { + this.connectionInitiated = true; + + // Creates the UAM connection + createUAMProxy(amrmToken); + } + + protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken) + throws IOException { + this.userUgi = UserGroupInformation.createProxyUser( + this.applicationId.toString(), UserGroupInformation.getCurrentUser()); + this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, + this.userUgi, amrmToken); } /** * Registers this {@link UnmanagedApplicationManager} with the resource * manager. * - * @param request the register request - * @return the register response + * @param request RegisterApplicationMasterRequest + * @return register response * @throws YarnException if register fails * @throws IOException if register fails */ - public RegisterApplicationMasterResponse createAndRegisterApplicationMaster( + public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - // This need to be done first in this method, because it is used as an - // indication that this method is called (and perhaps blocked due to RM - // connection and not finished yet) + // Save the register request for re-register later this.registerRequest = request; - // attemptId will be available after this call - UnmanagedAMIdentifier identifier = - initializeUnmanagedAM(this.applicationId); - - try { - this.userUgi = UserGroupInformation.createProxyUser( - identifier.getAttemptId().toString(), - UserGroupInformation.getCurrentUser()); - } catch (IOException e) { - LOG.error("Exception while trying to get current user", e); - throw new YarnRuntimeException(e); - } - - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, identifier.getToken()); - - LOG.info("Registering the Unmanaged application master {}", this.attemptId); + // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. + // We do not expect application already registered exception here + LOG.info("Registering the Unmanaged application master {}", + this.applicationId); RegisterApplicationMasterResponse response = this.rmProxy.registerApplicationMaster(this.registerRequest); + for (Container container : response.getContainersFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing running container " + + container.getId()); + } + for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { + LOG.info("RegisterUAM returned existing NM token for node " + + nmToken.getNodeId()); + } + // Only when register succeed that we start the heartbeat thread this.handlerThread.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -187,11 +250,11 @@ public class UnmanagedApplicationManager { this.handlerThread.shutdown(); if (this.rmProxy == null) { - if (this.registerRequest != null) { - // This is possible if the async registerApplicationMaster is still + if (this.connectionInitiated) { + // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. LOG.warn("Unmanaged AM still not successfully launched/registered yet." - + " Stopping the UAM client thread anyways."); + + " Stopping the UAM heartbeat thread anyways."); return FinishApplicationMasterResponse.newInstance(false); } else { throw new YarnException("finishApplicationMaster should not " @@ -199,7 +262,7 @@ public class UnmanagedApplicationManager { } } return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.attemptId); + this.registerRequest, this.applicationId); } /** @@ -212,7 +275,7 @@ public class UnmanagedApplicationManager { public KillApplicationResponse forceKillApplication() throws IOException, YarnException { KillApplicationRequest request = - KillApplicationRequest.newInstance(this.attemptId.getApplicationId()); + KillApplicationRequest.newInstance(this.applicationId); this.handlerThread.shutdown(); @@ -240,29 +303,29 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting to put on response queue", ex); } // Two possible cases why the UAM is not successfully registered yet: - // 1. registerApplicationMaster is not called at all. Should throw here. - // 2. registerApplicationMaster is called but hasn't successfully returned. + // 1. launchUAM is not called at all. Should throw here. + // 2. launchUAM is called but hasn't successfully returned. // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. if (this.rmProxy == null) { - if (this.registerRequest != null) { + if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); } else { throw new YarnException( - "AllocateAsync should not be called before createAndRegister"); + "AllocateAsync should not be called before launchUAM"); } } } /** - * Returns the application attempt id of the UAM. + * Returns the application id of the UAM. * - * @return attempt id of the UAM + * @return application id of the UAM */ - public ApplicationAttemptId getAttemptId() { - return this.attemptId; + public ApplicationId getAppId() { + return this.applicationId; } /** @@ -287,15 +350,15 @@ public class UnmanagedApplicationManager { * Launch and initialize an unmanaged AM. First, it creates a new application * on the RM and negotiates a new attempt id. Then it waits for the RM * application attempt state to reach YarnApplicationAttemptState.LAUNCHED - * after which it returns the AM-RM token and the attemptId. + * after which it returns the AM-RM token. * * @param appId application id - * @return the UAM identifier + * @return the UAM token * @throws IOException if initialize fails * @throws YarnException if initialize fails */ - protected UnmanagedAMIdentifier initializeUnmanagedAM(ApplicationId appId) - throws IOException, YarnException { + protected Token<AMRMTokenIdentifier> initializeUnmanagedAM( + ApplicationId appId) throws IOException, YarnException { try { UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); @@ -306,13 +369,12 @@ public class UnmanagedApplicationManager { submitUnmanagedApp(appId); // Monitor the application attempt to wait for launch state - ApplicationAttemptReport attemptReport = monitorCurrentAppAttempt(appId, + monitorCurrentAppAttempt(appId, EnumSet.of(YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING, YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED), YarnApplicationAttemptState.LAUNCHED); - this.attemptId = attemptReport.getApplicationAttemptId(); - return getUAMIdentifier(); + return getUAMToken(); } finally { this.rmClient = null; } @@ -343,6 +405,8 @@ public class UnmanagedApplicationManager { submitRequest.setApplicationSubmissionContext(context); context.setUnmanagedAM(true); + context.setKeepContainersAcrossApplicationAttempts( + this.keepContainersAcrossApplicationAttempts); LOG.info("Submitting unmanaged application {}", appId); this.rmClient.submitApplication(submitRequest); @@ -374,8 +438,10 @@ public class UnmanagedApplicationManager { if (appStates.contains(state)) { if (state != YarnApplicationState.ACCEPTED) { throw new YarnRuntimeException( - "Received non-accepted application state: " + state - + ". Application " + appId + " not the first attempt?"); + "Received non-accepted application state: " + state + " for " + + appId + ". This is likely because this is not the first " + + "app attempt in home sub-cluster, and AMRMProxy HA " + + "(yarn.nodemanager.amrmproxy.ha.enable) is not enabled."); } appAttemptId = getApplicationReport(appId).getCurrentApplicationAttemptId(); @@ -415,25 +481,25 @@ public class UnmanagedApplicationManager { } /** - * Gets the identifier of the unmanaged AM. + * Gets the amrmToken of the unmanaged AM. * - * @return the identifier of the unmanaged AM. + * @return the amrmToken of the unmanaged AM. * @throws IOException if getApplicationReport fails * @throws YarnException if getApplicationReport fails */ - protected UnmanagedAMIdentifier getUAMIdentifier() + protected Token<AMRMTokenIdentifier> getUAMToken() throws IOException, YarnException { Token<AMRMTokenIdentifier> token = null; org.apache.hadoop.yarn.api.records.Token amrmToken = - getApplicationReport(this.attemptId.getApplicationId()).getAMRMToken(); + getApplicationReport(this.applicationId).getAMRMToken(); if (amrmToken != null) { token = ConverterUtils.convertFromYarn(amrmToken, (Text) null); } else { LOG.warn( "AMRMToken not found in the application report for application: {}", - this.attemptId.getApplicationId()); + this.applicationId); } - return new UnmanagedAMIdentifier(this.attemptId, token); + return token; } private ApplicationReport getApplicationReport(ApplicationId appId) @@ -445,29 +511,6 @@ public class UnmanagedApplicationManager { } /** - * Data structure that encapsulates the application attempt identifier and the - * AMRMTokenIdentifier. Make it public because clients with HA need it. - */ - public static class UnmanagedAMIdentifier { - private ApplicationAttemptId attemptId; - private Token<AMRMTokenIdentifier> token; - - public UnmanagedAMIdentifier(ApplicationAttemptId attemptId, - Token<AMRMTokenIdentifier> token) { - this.attemptId = attemptId; - this.token = token; - } - - public ApplicationAttemptId getAttemptId() { - return this.attemptId; - } - - public Token<AMRMTokenIdentifier> getToken() { - return this.token; - } - } - - /** * Data structure that encapsulates AllocateRequest and AsyncCallback * instance. */ @@ -549,8 +592,10 @@ public class UnmanagedApplicationManager { } request.setResponseId(lastResponseId); + AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, attemptId); + request, rmProxy, registerRequest, applicationId); + if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } @@ -578,18 +623,17 @@ public class UnmanagedApplicationManager { LOG.debug("Interrupted while waiting for queue", ex); } } catch (IOException ex) { - LOG.warn( - "IO Error occurred while processing heart beat for " + attemptId, - ex); + LOG.warn("IO Error occurred while processing heart beat for " + + applicationId, ex); } catch (Throwable ex) { LOG.warn( - "Error occurred while processing heart beat for " + attemptId, + "Error occurred while processing heart beat for " + applicationId, ex); } } LOG.info("UnmanagedApplicationManager has been stopped for {}. " - + "AMRequestHandlerThread thread is exiting", attemptId); + + "AMRequestHandlerThread thread is exiting", applicationId); } } @@ -600,8 +644,8 @@ public class UnmanagedApplicationManager { implements UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Heartbeat thread {} for application attempt {} crashed!", - t.getName(), attemptId, e); + LOG.error("Heartbeat thread {} for application {} crashed!", + t.getName(), applicationId, e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java index 7993bd8..3cecdca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/AMRMClientUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; @@ -63,16 +63,16 @@ public final class AMRMClientUtils { /** * Handle ApplicationNotRegistered exception and re-register. * - * @param attemptId app attemptId + * @param appId application Id * @param rmProxy RM proxy instance * @param registerRequest the AM re-register request * @throws YarnException if re-register fails */ public static void handleNotRegisteredExceptionAndReRegister( - ApplicationAttemptId attemptId, ApplicationMasterProtocol rmProxy, + ApplicationId appId, ApplicationMasterProtocol rmProxy, RegisterApplicationMasterRequest registerRequest) throws YarnException { LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", attemptId); + + " Trying to re-register.", appId); try { rmProxy.registerApplicationMaster(registerRequest); } catch (Exception e) { @@ -93,25 +93,24 @@ public final class AMRMClientUtils { * @param request allocate request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return allocate response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static AllocateResponse allocateWithReRegister(AllocateRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.allocate(request); } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // reset responseId after re-register request.setResponseId(0); // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, - attemptId); + return allocateWithReRegister(request, rmProxy, registerRequest, appId); } } @@ -123,23 +122,22 @@ public final class AMRMClientUtils { * @param request finishApplicationMaster request * @param rmProxy RM proxy * @param registerRequest the register request for re-register - * @param attemptId application attempt id + * @param appId application id * @return finishApplicationMaster response * @throws YarnException if RM call fails * @throws IOException if RM call fails */ public static FinishApplicationMasterResponse finishAMWithReRegister( FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, - ApplicationAttemptId attemptId) throws YarnException, IOException { + RegisterApplicationMasterRequest registerRequest, ApplicationId appId) + throws YarnException, IOException { try { return rmProxy.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(attemptId, rmProxy, + handleNotRegisteredExceptionAndReRegister(appId, rmProxy, registerRequest); // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, - attemptId); + return finishAMWithReRegister(request, rmProxy, registerRequest, appId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 628c781..b5727aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -177,10 +178,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, LoggerFactory.getLogger(MockResourceManagerFacade.class); private HashSet<ApplicationId> applicationMap = new HashSet<>(); - private HashMap<String, List<ContainerId>> applicationContainerIdMap = - new HashMap<String, List<ContainerId>>(); - private HashMap<ContainerId, Container> allocatedContainerMap = - new HashMap<ContainerId, Container>(); + private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>(); + private HashMap<ApplicationAttemptId, List<ContainerId>> + applicationContainerIdMap = new HashMap<>(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -221,7 +221,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, this.isRunning = mode; } - private static String getAppIdentifier() throws IOException { + private static ApplicationAttemptId getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); @@ -231,7 +231,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, break; } } - return result != null ? result.getApplicationAttemptId().toString() : ""; + return result != null ? result.getApplicationAttemptId() + : ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0); } private void validateRunning() throws ConnectException { @@ -246,19 +247,32 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throws YarnException, IOException { validateRunning(); - - String amrmToken = getAppIdentifier(); - LOG.info("Registering application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Registering application attempt: " + attemptId); shouldReRegisterNext = false; + List<Container> containersFromPreviousAttempt = null; + synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(amrmToken)) { - throw new InvalidApplicationMasterRequestException( - AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + if (applicationContainerIdMap.containsKey(attemptId)) { + if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + // For UAM with the keepContainersFromPreviousAttempt flag, return all + // running containers + containersFromPreviousAttempt = new ArrayList<>(); + for (ContainerId containerId : applicationContainerIdMap + .get(attemptId)) { + containersFromPreviousAttempt.add(Container.newInstance(containerId, + null, null, null, null, null)); + } + } else { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); + } + } else { + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>()); } - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>()); } // Make sure we wait for certain test cases last in the method @@ -278,7 +292,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } return RegisterApplicationMasterResponse.newInstance(null, null, null, null, - null, request.getHost(), null); + containersFromPreviousAttempt, request.getHost(), null); } @Override @@ -288,8 +302,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - String amrmToken = getAppIdentifier(); - LOG.info("Finishing application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Finishing application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -299,12 +313,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } + Assert.assertTrue("The application id is NOT registered: " + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + applicationContainerIdMap.remove(attemptId); } return FinishApplicationMasterResponse.newInstance( @@ -334,8 +345,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + "askList and releaseList in the same heartbeat"); } - String amrmToken = getAppIdentifier(); - LOG.info("Allocate from application attempt: " + amrmToken); + ApplicationAttemptId attemptId = getAppIdentifier(); + LOG.info("Allocate from application attempt: " + attemptId); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -367,16 +378,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List<ContainerId> ids = applicationContainerIdMap.get(attemptId); ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); } } } } + List<ContainerStatus> completedList = new ArrayList<>(); if (request.getReleaseList() != null && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); @@ -384,9 +395,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, Assert .assertTrue( "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); + + attemptId, + applicationContainerIdMap.containsKey(attemptId)); + List<ContainerId> ids = applicationContainerIdMap.get(attemptId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -402,18 +413,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + conf.get("AMRMTOKEN"), found); ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = ContainerId.newInstance( - getApplicationAttemptId(1), containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); + completedList.add( + ContainerStatus.newInstance(id, ContainerState.COMPLETE, "", 0)); } } } @@ -424,9 +425,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - return AllocateResponse.newInstance(0, new ArrayList<ContainerStatus>(), - containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, - 1, null, new ArrayList<NMToken>(), newAMRMToken, + return AllocateResponse.newInstance(0, completedList, containerList, + new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList<NMToken>(), newAMRMToken, new ArrayList<UpdatedContainer>()); } @@ -443,6 +444,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, report.setApplicationId(request.getApplicationId()); report.setCurrentApplicationAttemptId( ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); response.setApplicationReport(report); return response; } @@ -486,6 +488,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } LOG.info("Application submitted: " + appId); applicationMap.add(appId); + + if (request.getApplicationSubmissionContext().getUnmanagedAM() + || request.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + keepContainerOnUams.add(appId); + } return SubmitApplicationResponse.newInstance(); } @@ -502,6 +510,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throw new ApplicationNotFoundException( "Trying to kill an absent application: " + appId); } + keepContainerOnUams.remove(appId); } LOG.info("Force killing application: " + appId); return KillApplicationResponse.newInstance(true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java new file mode 100644 index 0000000..42be851 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for FederationRegistryClient. + */ +public class TestFederationRegistryClient { + private Configuration conf; + private UserGroupInformation user; + private RegistryOperations registry; + private FederationRegistryClient registryClient; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + + this.registry = new FSRegistryOperationsService(); + this.registry.init(this.conf); + this.registry.start(); + + this.user = UserGroupInformation.getCurrentUser(); + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + this.registryClient.cleanAllApplications(); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } + + @After + public void breakDown() { + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + registry.stop(); + } + + @Test + public void testBasicCase() { + ApplicationId appId = ApplicationId.newInstance(0, 0); + String scId1 = "subcluster1"; + String scId2 = "subcluster2"; + + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token<AMRMTokenIdentifier>()); + this.registryClient.writeAMRMTokenForUAM(appId, scId2, + new Token<AMRMTokenIdentifier>()); + // Duplicate entry, should overwrite + this.registryClient.writeAMRMTokenForUAM(appId, scId1, + new Token<AMRMTokenIdentifier>()); + + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + Assert.assertEquals(2, + this.registryClient.loadStateFromRegistry(appId).size()); + + this.registryClient.removeAppFromRegistry(appId); + + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + Assert.assertEquals(0, + this.registryClient.loadStateFromRegistry(appId).size()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index 9159cf7..5848d3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -65,7 +65,7 @@ public class TestUnmanagedApplicationManager { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix"); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); } protected void waitForCallBackCountAndCheckZeroPending( @@ -88,7 +88,8 @@ public class TestUnmanagedApplicationManager { public void testBasicUsage() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, @@ -102,11 +103,48 @@ public class TestUnmanagedApplicationManager { attemptId); } + /* + * Test re-attaching of an existing UAM. This is for HA of UAM client. + */ + @Test(timeout = 5000) + public void testUAMReAttach() + throws YarnException, IOException, InterruptedException { + + launchUAM(attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 1); + + MockResourceManagerFacade rmProxy = uam.getRMProxy(); + uam = new TestableUnmanagedApplicationManager(conf, + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + uam.setRMProxy(rmProxy); + + reAttachUAM(null, attemptId); + registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); + + allocateAsync(AllocateRequest.newInstance(0, 0, null, null, null), callback, + attemptId); + + // Wait for outstanding async allocate callback + waitForCallBackCountAndCheckZeroPending(callback, 2); + + finishApplicationMaster( + FinishApplicationMasterRequest.newInstance(null, null, null), + attemptId); + } + @Test(timeout = 5000) public void testReRegister() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.setShouldReRegisterNext(); @@ -137,7 +175,8 @@ public class TestUnmanagedApplicationManager { @Override public void run() { try { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null), attemptId); } catch (Exception e) { @@ -221,7 +260,8 @@ public class TestUnmanagedApplicationManager { @Test public void testForceKill() throws YarnException, IOException, InterruptedException { - createAndRegisterApplicationMaster( + launchUAM(attemptId); + registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 0, null), attemptId); uam.forceKillApplication(); @@ -241,19 +281,40 @@ public class TestUnmanagedApplicationManager { return ugi; } - protected RegisterApplicationMasterResponse - createAndRegisterApplicationMaster( - final RegisterApplicationMasterRequest request, - ApplicationAttemptId appAttemptId) - throws YarnException, IOException, InterruptedException { + protected Token<AMRMTokenIdentifier> launchUAM( + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + return getUGIWithToken(appAttemptId) + .doAs(new PrivilegedExceptionAction<Token<AMRMTokenIdentifier>>() { + @Override + public Token<AMRMTokenIdentifier> run() throws Exception { + return uam.launchUAM(); + } + }); + } + + protected void reAttachUAM(final Token<AMRMTokenIdentifier> uamToken, + ApplicationAttemptId appAttemptId) + throws IOException, InterruptedException { + getUGIWithToken(appAttemptId).doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Token<AMRMTokenIdentifier> run() throws Exception { + uam.reAttachUAM(uamToken); + return null; + } + }); + } + + protected RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request, + ApplicationAttemptId appAttemptId) + throws YarnException, IOException, InterruptedException { return getUGIWithToken(appAttemptId).doAs( new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() { @Override public RegisterApplicationMasterResponse run() throws YarnException, IOException { - RegisterApplicationMasterResponse response = - uam.createAndRegisterApplicationMaster(request); - return response; + return uam.registerApplicationMaster(request); } }); } @@ -311,8 +372,9 @@ public class TestUnmanagedApplicationManager { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix) { - super(conf, appId, queueName, submitter, appNameSuffix); + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + super(conf, appId, queueName, submitter, appNameSuffix, + keepContainersAcrossApplicationAttempts); } @SuppressWarnings("unchecked") @@ -330,6 +392,14 @@ public class TestUnmanagedApplicationManager { rmProxy.setShouldReRegisterNext(); } } + + public MockResourceManagerFacade getRMProxy() { + return rmProxy; + } + + public void setRMProxy(MockResourceManagerFacade proxy) { + this.rmProxy = proxy; + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java index c355a8b..92afcb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -67,4 +69,18 @@ public interface AMRMProxyApplicationContext { */ Context getNMCotext(); + /** + * Gets the credentials of this application. + * + * @return the credentials. + */ + Credentials getCredentials(); + + /** + * Gets the registry client. + * + * @return the registry. + */ + RegistryOperations getRegistryClient(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5f66888/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 9938b37..8a02095 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -42,6 +44,8 @@ public class AMRMProxyApplicationContextImpl implements private Integer localTokenKeyId; private Token<AMRMTokenIdentifier> amrmToken; private Token<AMRMTokenIdentifier> localToken; + private Credentials credentials; + private RegistryOperations registry; /** * Create an instance of the AMRMProxyApplicationContext. @@ -52,17 +56,23 @@ public class AMRMProxyApplicationContextImpl implements * @param user user name of the application * @param amrmToken amrmToken issued by RM * @param localToken amrmToken issued by AMRMProxy + * @param credentials application credentials + * @param registry Yarn Registry client */ - public AMRMProxyApplicationContextImpl(Context nmContext, - Configuration conf, ApplicationAttemptId applicationAttemptId, - String user, Token<AMRMTokenIdentifier> amrmToken, - Token<AMRMTokenIdentifier> localToken) { + @SuppressWarnings("checkstyle:parameternumber") + public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, + ApplicationAttemptId applicationAttemptId, String user, + Token<AMRMTokenIdentifier> amrmToken, + Token<AMRMTokenIdentifier> localToken, Credentials credentials, + RegistryOperations registry) { this.nmContext = nmContext; this.conf = conf; this.applicationAttemptId = applicationAttemptId; this.user = user; this.amrmToken = amrmToken; this.localToken = localToken; + this.credentials = credentials; + this.registry = registry; } @Override @@ -88,11 +98,14 @@ public class AMRMProxyApplicationContextImpl implements /** * Sets the application's AMRMToken. * - * @param amrmToken amrmToken issued by RM + * @param amrmToken the new amrmToken from RM + * @return whether the saved token is updated to a different value */ - public synchronized void setAMRMToken( + public synchronized boolean setAMRMToken( Token<AMRMTokenIdentifier> amrmToken) { + Token<AMRMTokenIdentifier> oldValue = this.amrmToken; this.amrmToken = amrmToken; + return !this.amrmToken.equals(oldValue); } @Override @@ -134,4 +147,14 @@ public class AMRMProxyApplicationContextImpl implements public Context getNMCotext() { return nmContext; } + + @Override + public Credentials getCredentials() { + return this.credentials; + } + + @Override + public RegistryOperations getRegistryClient() { + return this.registry; + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org