YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f72f1e6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f72f1e6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f72f1e6 Branch: refs/heads/trunk Commit: 6f72f1e6003ab11679bebeb96f27f1f62b3b3e02 Parents: 9b78e6e Author: Jian He <[email protected]> Authored: Tue Sep 8 09:35:46 2015 +0800 Committer: Jian He <[email protected]> Committed: Tue Sep 8 09:35:46 2015 +0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 17 + .../yarn/conf/TestYarnConfigurationFields.java | 2 + .../src/main/resources/yarn-default.xml | 34 + .../server/utils/YarnServerSecurityUtils.java | 142 ++++ .../amrmproxy/AMRMProxyApplicationContext.java | 70 ++ .../AMRMProxyApplicationContextImpl.java | 132 ++++ .../nodemanager/amrmproxy/AMRMProxyService.java | 592 ++++++++++++++++ .../amrmproxy/AMRMProxyTokenSecretManager.java | 265 ++++++++ .../amrmproxy/AbstractRequestInterceptor.java | 102 +++ .../amrmproxy/DefaultRequestInterceptor.java | 138 ++++ .../amrmproxy/RequestInterceptor.java | 71 ++ .../containermanager/ContainerManagerImpl.java | 67 +- .../amrmproxy/BaseAMRMProxyTest.java | 677 +++++++++++++++++++ .../amrmproxy/MockRequestInterceptor.java | 65 ++ .../amrmproxy/MockResourceManagerFacade.java | 469 +++++++++++++ .../PassThroughRequestInterceptor.java | 58 ++ .../amrmproxy/TestAMRMProxyService.java | 484 +++++++++++++ .../ApplicationMasterService.java | 69 +- 19 files changed, 3366 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a162070..be1feb4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED YARN-3970. Add REST api support for Application Priority. (Naganarasimha G R via vvasudev) + YARN-2884. Added a proxy service in NM to proxy the the communication + between AM and RM. (Kishore Chaliparambil via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4879ca1..9ec25ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration { public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX + "application.classpath"; + public static final String AMRM_PROXY_ENABLED = NM_PREFIX + + "amrmproxy.enable"; + public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; + public static final String AMRM_PROXY_ADDRESS = NM_PREFIX + + "amrmproxy.address"; + public static final int DEFAULT_AMRM_PROXY_PORT = 8048; + public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:" + + DEFAULT_AMRM_PROXY_PORT; + public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX + + "amrmproxy.client.thread-count"; + public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25; + public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + NM_PREFIX + "amrmproxy.interceptor-class.pipeline"; + public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE = + "org.apache.hadoop.yarn.server.nodemanager.amrmproxy." + + "DefaultRequestInterceptor"; + /** * Default platform-agnostic CLASSPATH for YARN applications. A * comma-separated list of CLASSPATH entries. The parameter expansion marker http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index e89a90d..97fcfa1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS); configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL); + configurationPropsToSkipCompare + .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); // Ignore all YARN Application Timeline Service (version 1) properties configurationPrefixToSkipCompare.add("yarn.timeline-service."); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 59bfb56..b76defb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2259,4 +2259,38 @@ <value></value> </property> + <property> + <description> + Enable/Disable AMRMProxyService in the node manager. This service is used to intercept + calls from the application masters to the resource manager. + </description> + <name>yarn.nodemanager.amrmproxy.enable</name> + <value>false</value> + </property> + + <property> + <description> + The address of the AMRMProxyService listener. + </description> + <name>yarn.nodemanager.amrmproxy.address</name> + <value>0.0.0.0:8048</value> + </property> + + <property> + <description> + The number of threads used to handle requests by the AMRMProxyService. + </description> + <name>yarn.nodemanager.amrmproxy.client.thread-count</name> + <value>25</value> + </property> + + <property> + <description> + The comma separated list of class names that implement the RequestInterceptor interface. This is used by the + AMRMProxyService to create the request processing pipeline for applications. + </description> + <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name> + <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java new file mode 100644 index 0000000..9af556e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.utils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that contains commonly used server methods. + * + */ +@Private +public final class YarnServerSecurityUtils { + private static final Logger LOG = LoggerFactory + .getLogger(YarnServerSecurityUtils.class); + + private YarnServerSecurityUtils() { + } + + /** + * Authorizes the current request and returns the AMRMTokenIdentifier for the + * current application. + * + * @return the AMRMTokenIdentifier instance for the current user + * @throws YarnException + */ + public static AMRMTokenIdentifier authorizeRequest() + throws YarnException { + + UserGroupInformation remoteUgi; + try { + remoteUgi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = + "Cannot obtain the user-name for authorizing ApplicationMaster. " + + "Got exception: " + StringUtils.stringifyException(e); + LOG.warn(msg); + throw RPCUtil.getRemoteException(msg); + } + + boolean tokenFound = false; + String message = ""; + AMRMTokenIdentifier appTokenIdentifier = null; + try { + appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); + if (appTokenIdentifier == null) { + tokenFound = false; + message = "No AMRMToken found for user " + remoteUgi.getUserName(); + } else { + tokenFound = true; + } + } catch (IOException e) { + tokenFound = false; + message = + "Got exception while looking for AMRMToken for user " + + remoteUgi.getUserName(); + } + + if (!tokenFound) { + LOG.warn(message); + throw RPCUtil.getRemoteException(message); + } + + return appTokenIdentifier; + } + + // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer + // currently sets only the required id, but iterate through anyways just to be + // sure. + private static AMRMTokenIdentifier selectAMRMTokenIdentifier( + UserGroupInformation remoteUgi) throws IOException { + AMRMTokenIdentifier result = null; + Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + + return result; + } + + /** + * Parses the container launch context and returns a Credential instance that + * contains all the tokens from the launch context. + * @param launchContext + * @return the credential instance + * @throws IOException + */ + public static Credentials parseCredentials( + ContainerLaunchContext launchContext) throws IOException { + Credentials credentials = new Credentials(); + ByteBuffer tokens = launchContext.getTokens(); + + if (tokens != null) { + DataInputByteBuffer buf = new DataInputByteBuffer(); + tokens.rewind(); + buf.reset(tokens); + credentials.readTokenStorageStream(buf); + if (LOG.isDebugEnabled()) { + for (Token<? extends TokenIdentifier> tk : credentials + .getAllTokens()) { + LOG.debug(tk.getService() + " = " + tk.toString()); + } + } + } + + return credentials; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java new file mode 100644 index 0000000..c355a8b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Interface that can be used by the intercepter plugins to get the information + * about one application. + * + */ +public interface AMRMProxyApplicationContext { + + /** + * Gets the configuration object instance. + * @return the configuration object. + */ + Configuration getConf(); + + /** + * Gets the application attempt identifier. + * @return the application attempt identifier. + */ + ApplicationAttemptId getApplicationAttemptId(); + + /** + * Gets the application submitter. + * @return the application submitter + */ + String getUser(); + + /** + * Gets the application's AMRMToken that is issued by the RM. + * @return the application's AMRMToken that is issued by the RM. + */ + Token<AMRMTokenIdentifier> getAMRMToken(); + + /** + * Gets the application's local AMRMToken issued by the proxy service. + * @return the application's local AMRMToken issued by the proxy service. + */ + Token<AMRMTokenIdentifier> getLocalAMRMToken(); + + /** + * Gets the NMContext object. + * @return the NMContext. + */ + Context getNMCotext(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java new file mode 100644 index 0000000..2e5aa94 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Encapsulates the information about one application that is needed by the + * request intercepters. + * + */ +public class AMRMProxyApplicationContextImpl implements + AMRMProxyApplicationContext { + private final Configuration conf; + private final Context nmContext; + private final ApplicationAttemptId applicationAttemptId; + private final String user; + private Integer localTokenKeyId; + private Token<AMRMTokenIdentifier> amrmToken; + private Token<AMRMTokenIdentifier> localToken; + + /** + * Create an instance of the AMRMProxyApplicationContext. + * + * @param nmContext + * @param conf + * @param applicationAttemptId + * @param user + * @param amrmToken + */ + public AMRMProxyApplicationContextImpl(Context nmContext, + Configuration conf, ApplicationAttemptId applicationAttemptId, + String user, Token<AMRMTokenIdentifier> amrmToken, + Token<AMRMTokenIdentifier> localToken) { + this.nmContext = nmContext; + this.conf = conf; + this.applicationAttemptId = applicationAttemptId; + this.user = user; + this.amrmToken = amrmToken; + this.localToken = localToken; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + @Override + public String getUser() { + return user; + } + + @Override + public synchronized Token<AMRMTokenIdentifier> getAMRMToken() { + return amrmToken; + } + + /** + * Sets the application's AMRMToken. + */ + public synchronized void setAMRMToken( + Token<AMRMTokenIdentifier> amrmToken) { + this.amrmToken = amrmToken; + } + + @Override + public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() { + return this.localToken; + } + + /** + * Sets the application's AMRMToken. + */ + public synchronized void setLocalAMRMToken( + Token<AMRMTokenIdentifier> localToken) { + this.localToken = localToken; + this.localTokenKeyId = null; + } + + @Private + public synchronized int getLocalAMRMTokenKeyId() { + Integer keyId = this.localTokenKeyId; + if (keyId == null) { + try { + if (this.localToken == null) { + throw new YarnRuntimeException("Missing AMRM token for " + + this.applicationAttemptId); + } + keyId = this.amrmToken.decodeIdentifier().getKeyId(); + this.localTokenKeyId = keyId; + } catch (IOException e) { + throw new YarnRuntimeException("AMRM token decode error for " + + this.applicationAttemptId, e); + } + } + return keyId; + } + + @Override + public Context getNMCotext() { + return nmContext; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java new file mode 100644 index 0000000..bd6538c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -0,0 +1,592 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.security.MasterKeyData; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * AMRMProxyService is a service that runs on each node manager that can be used + * to intercept and inspect messages from application master to the cluster + * resource manager. It listens to messages from the application master and + * creates a request intercepting pipeline instance for each application. The + * pipeline is a chain of intercepter instances that can inspect and modify the + * request/response as needed. + */ +public class AMRMProxyService extends AbstractService implements + ApplicationMasterProtocol { + private static final Logger LOG = LoggerFactory + .getLogger(AMRMProxyService.class); + private Server server; + private final Context nmContext; + private final AsyncDispatcher dispatcher; + private InetSocketAddress listenerEndpoint; + private AMRMProxyTokenSecretManager secretManager; + private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap; + + /** + * Creates an instance of the service. + * + * @param nmContext + * @param dispatcher + */ + public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { + super(AMRMProxyService.class.getName()); + Preconditions.checkArgument(nmContext != null, "nmContext is null"); + Preconditions.checkArgument(dispatcher != null, "dispatcher is null"); + this.nmContext = nmContext; + this.dispatcher = dispatcher; + this.applPipelineMap = + new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>(); + + this.dispatcher.register(ApplicationEventType.class, + new ApplicationEventHandler()); + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Starting AMRMProxyService"); + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation.setConfiguration(conf); + + this.listenerEndpoint = + conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_PORT); + + Configuration serverConf = new Configuration(conf); + serverConf.set( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + + int numWorkerThreads = + serverConf.getInt( + YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT); + + this.secretManager = new AMRMProxyTokenSecretManager(serverConf); + this.secretManager.start(); + + this.server = + rpc.getServer(ApplicationMasterProtocol.class, this, + listenerEndpoint, serverConf, this.secretManager, + numWorkerThreads); + + this.server.start(); + LOG.info("AMRMProxyService listening on address: " + + this.server.getListenerAddress()); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping AMRMProxyService"); + if (this.server != null) { + this.server.stop(); + } + + this.secretManager.stop(); + + super.serviceStop(); + } + + /** + * This is called by the AMs started on this node to register with the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain. + */ + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Registering application master." + " Host:" + + request.getHost() + " Port:" + request.getRpcPort() + + " Tracking Url:" + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + return pipeline.getRootInterceptor() + .registerApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to unregister from the RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific intercepter chain. + */ + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Finishing application master. Tracking Url:" + + request.getTrackingUrl()); + RequestInterceptorChainWrapper pipeline = + authorizeAndGetInterceptorChain(); + return pipeline.getRootInterceptor().finishApplicationMaster(request); + } + + /** + * This is called by the AMs started on this node to send heart beat to RM. + * This method does the initial authorization and then forwards the request to + * the application instance specific pipeline, which is a chain of request + * intercepter objects. One application request processing pipeline is created + * per AM instance. + */ + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); + RequestInterceptorChainWrapper pipeline = + getInterceptorChain(amrmTokenIdentifier); + AllocateResponse allocateResponse = + pipeline.getRootInterceptor().allocate(request); + + updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse); + + return allocateResponse; + } + + /** + * Callback from the ContainerManager implementation for initializing the + * application request processing pipeline. + * + * @param request - encapsulates information for starting an AM + * @throws IOException + * @throws YarnException + */ + public void processApplicationStartRequest(StartContainerRequest request) + throws IOException, YarnException { + LOG.info("Callback received for initializing request " + + "processing pipeline for an AM"); + ContainerTokenIdentifier containerTokenIdentifierForKey = + BuilderUtils.newContainerTokenIdentifier(request + .getContainerToken()); + ApplicationAttemptId appAttemptId = + containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(request + .getContainerLaunchContext()); + + Token<AMRMTokenIdentifier> amrmToken = + getFirstAMRMToken(credentials.getAllTokens()); + if (amrmToken == null) { + throw new YarnRuntimeException( + "AMRMToken not found in the start container request for application:" + + appAttemptId.toString()); + } + + // Substitute the existing AMRM Token with a local one. Keep the rest of the + // tokens in the credentials intact. + Token<AMRMTokenIdentifier> localToken = + this.secretManager.createAndGetAMRMToken(appAttemptId); + credentials.addToken(localToken.getService(), localToken); + + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + request.getContainerLaunchContext().setTokens( + ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + + initializePipeline(containerTokenIdentifierForKey.getContainerID() + .getApplicationAttemptId(), + containerTokenIdentifierForKey.getApplicationSubmitter(), + amrmToken, localToken); + } + + /** + * Initializes the request intercepter pipeline for the specified application. + * + * @param applicationAttemptId + * @param user + * @param amrmToken + */ + protected void initializePipeline( + ApplicationAttemptId applicationAttemptId, String user, + Token<AMRMTokenIdentifier> amrmToken, + Token<AMRMTokenIdentifier> localToken) { + RequestInterceptorChainWrapper chainWrapper = null; + synchronized (applPipelineMap) { + if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) { + LOG.warn("Request to start an already existing appId was received. " + + " This can happen if an application failed and a new attempt " + + "was created on this machine. ApplicationId: " + + applicationAttemptId.toString()); + return; + } + + chainWrapper = new RequestInterceptorChainWrapper(); + this.applPipelineMap.put(applicationAttemptId.getApplicationId(), + chainWrapper); + } + + // We register the pipeline instance in the map first and then initialize it + // later because chain initialization can be expensive and we would like to + // release the lock as soon as possible to prevent other applications from + // blocking when one application's chain is initializing + LOG.info("Initializing request processing pipeline for application. " + + " ApplicationId:" + applicationAttemptId + " for the user: " + + user); + + RequestInterceptor interceptorChain = + this.createRequestInterceptorChain(); + interceptorChain.init(createApplicationMasterContext( + applicationAttemptId, user, amrmToken, localToken)); + chainWrapper.init(interceptorChain, applicationAttemptId); + } + + /** + * Shuts down the request processing pipeline for the specified application + * attempt id. + * + * @param applicationId + */ + protected void stopApplication(ApplicationId applicationId) { + Preconditions.checkArgument(applicationId != null, + "applicationId is null"); + RequestInterceptorChainWrapper pipeline = + this.applPipelineMap.remove(applicationId); + + if (pipeline == null) { + LOG.info("Request to stop an application that does not exist. Id:" + + applicationId); + } else { + LOG.info("Stopping the request processing pipeline for application: " + + applicationId); + try { + pipeline.getRootInterceptor().shutdown(); + } catch (Throwable ex) { + LOG.warn( + "Failed to shutdown the request processing pipeline for app:" + + applicationId, ex); + } + } + } + + private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, + RequestInterceptorChainWrapper pipeline, + AllocateResponse allocateResponse) { + AMRMProxyApplicationContextImpl context = + (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor() + .getApplicationContext(); + + // check to see if the RM has issued a new AMRMToken & accordingly update + // the real ARMRMToken in the current context + if (allocateResponse.getAMRMToken() != null) { + org.apache.hadoop.yarn.api.records.Token token = + allocateResponse.getAMRMToken(); + + org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId = + new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + + context.setAMRMToken(newTokenId); + } + + // Check if the local AMRMToken is rolled up and update the context and + // response accordingly + MasterKeyData nextMasterKey = + this.secretManager.getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken(); + if (nextMasterKey.getMasterKey().getKeyId() != context + .getLocalAMRMTokenKeyId()) { + LOG.info("The local AMRMToken has been rolled-over." + + " Send new local AMRMToken back to application: " + + pipeline.getApplicationId()); + localToken = + this.secretManager.createAndGetAMRMToken(pipeline + .getApplicationAttemptId()); + context.setLocalAMRMToken(localToken); + } + + allocateResponse + .setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(localToken.getIdentifier(), localToken + .getKind().toString(), localToken.getPassword(), + localToken.getService().toString())); + } + } + + private AMRMProxyApplicationContext createApplicationMasterContext( + ApplicationAttemptId applicationAttemptId, String user, + Token<AMRMTokenIdentifier> amrmToken, + Token<AMRMTokenIdentifier> localToken) { + AMRMProxyApplicationContextImpl appContext = + new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(), + applicationAttemptId, user, amrmToken, localToken); + return appContext; + } + + /** + * Gets the Request intercepter chains for all the applications. + * + * @return the request intercepter chains. + */ + protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() { + return this.applPipelineMap; + } + + /** + * This method creates and returns reference of the first intercepter in the + * chain of request intercepter instances. + * + * @return the reference of the first intercepter in the chain + */ + protected RequestInterceptor createRequestInterceptorChain() { + Configuration conf = getConfig(); + + List<String> interceptorClassNames = getInterceptorClassNames(conf); + + RequestInterceptor pipeline = null; + RequestInterceptor current = null; + for (String interceptorClassName : interceptorClassNames) { + try { + Class<?> interceptorClass = + conf.getClassByName(interceptorClassName); + if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) { + RequestInterceptor interceptorInstance = + (RequestInterceptor) ReflectionUtils.newInstance( + interceptorClass, conf); + if (pipeline == null) { + pipeline = interceptorInstance; + current = interceptorInstance; + continue; + } else { + current.setNextInterceptor(interceptorInstance); + current = interceptorInstance; + } + } else { + throw new YarnRuntimeException("Class: " + interceptorClassName + + " not instance of " + + RequestInterceptor.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate ApplicationMasterRequestInterceptor: " + + interceptorClassName, e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system"); + } + return pipeline; + } + + /** + * Returns the comma separated intercepter class names from the configuration. + * + * @param conf + * @return the intercepter class names as an instance of ArrayList + */ + private List<String> getInterceptorClassNames(Configuration conf) { + String configuredInterceptorClassNames = + conf.get( + YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + + List<String> interceptorClassNames = new ArrayList<String>(); + Collection<String> tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + + return interceptorClassNames; + } + + /** + * Authorizes the request and returns the application specific request + * processing pipeline. + * + * @return the the intercepter wrapper instance + * @throws YarnException + */ + private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain() + throws YarnException { + AMRMTokenIdentifier tokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); + return getInterceptorChain(tokenIdentifier); + } + + private RequestInterceptorChainWrapper getInterceptorChain( + AMRMTokenIdentifier tokenIdentifier) throws YarnException { + ApplicationAttemptId appAttemptId = + tokenIdentifier.getApplicationAttemptId(); + + synchronized (this.applPipelineMap) { + if (!this.applPipelineMap.containsKey(appAttemptId + .getApplicationId())) { + throw new YarnException( + "The AM request processing pipeline is not initialized for app: " + + appAttemptId.getApplicationId().toString()); + } + + return this.applPipelineMap.get(appAttemptId.getApplicationId()); + } + } + + @SuppressWarnings("unchecked") + private Token<AMRMTokenIdentifier> getFirstAMRMToken( + Collection<Token<? extends TokenIdentifier>> allTokens) { + Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator(); + while (iter.hasNext()) { + Token<? extends TokenIdentifier> token = iter.next(); + if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { + return (Token<AMRMTokenIdentifier>) token; + } + } + + return null; + } + + /** + * Private class for handling application stop events. + * + */ + class ApplicationEventHandler implements EventHandler<ApplicationEvent> { + + @Override + public void handle(ApplicationEvent event) { + Application app = + AMRMProxyService.this.nmContext.getApplications().get( + event.getApplicationID()); + if (app != null) { + switch (event.getType()) { + case FINISH_APPLICATION: + LOG.info("Application stop event received for stopping AppId:" + + event.getApplicationID().toString()); + AMRMProxyService.this.stopApplication(event.getApplicationID()); + break; + default: + LOG.debug("AMRMProxy is ignoring event: " + event.getType()); + break; + } + } else { + LOG.warn("Event " + event + " sent to absent application " + + event.getApplicationID()); + } + } + } + + /** + * Private structure for encapsulating RequestInterceptor and + * ApplicationAttemptId instances. + * + */ + private static class RequestInterceptorChainWrapper { + private RequestInterceptor rootInterceptor; + private ApplicationAttemptId applicationAttemptId; + + /** + * Initializes the wrapper with the specified parameters. + * + * @param rootInterceptor + * @param applicationAttemptId + */ + public synchronized void init(RequestInterceptor rootInterceptor, + ApplicationAttemptId applicationAttemptId) { + this.rootInterceptor = rootInterceptor; + this.applicationAttemptId = applicationAttemptId; + } + + /** + * Gets the root request intercepter. + * + * @return the root request intercepter + */ + public synchronized RequestInterceptor getRootInterceptor() { + return rootInterceptor; + } + + /** + * Gets the application attempt identifier. + * + * @return the application attempt identifier + */ + public synchronized ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } + + /** + * Gets the application identifier. + * + * @return the application identifier + */ + public synchronized ApplicationId getApplicationId() { + return applicationAttemptId.getApplicationId(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java new file mode 100644 index 0000000..d09ce41 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This secret manager instance is used by the AMRMProxyService to generate and + * manage tokens. + */ +public class AMRMProxyTokenSecretManager extends + SecretManager<AMRMTokenIdentifier> { + + private static final Log LOG = LogFactory + .getLog(AMRMProxyTokenSecretManager.class); + + private int serialNo = new SecureRandom().nextInt(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + private final Timer timer; + private final long rollingInterval; + private final long activationDelay; + + private final Set<ApplicationAttemptId> appAttemptSet = + new HashSet<ApplicationAttemptId>(); + + /** + * Create an {@link AMRMProxyTokenSecretManager}. + */ + public AMRMProxyTokenSecretManager(Configuration conf) { + this.timer = new Timer(); + this.rollingInterval = + conf.getLong( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, + YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + + " ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 3 X " + + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS); + } + } + + public void start() { + if (this.currentMasterKey == null) { + this.currentMasterKey = createNewMasterKey(); + } + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, + rollingInterval); + } + + public void stop() { + this.timer.cancel(); + } + + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Application finished, removing password for " + + appAttemptId); + this.appAttemptSet.remove(appAttemptId); + } finally { + this.writeLock.unlock(); + } + } + + private class MasterKeyRoller extends TimerTask { + @Override + public void run() { + rollMasterKey(); + } + } + + @Private + void rollMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } finally { + this.writeLock.unlock(); + } + } + + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + public void activateNextMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } finally { + this.writeLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData createNewMasterKey() { + this.writeLock.lock(); + try { + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } + } + + public Token<AMRMTokenIdentifier> createAndGetAMRMToken( + ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey() + .getMasterKey().getKeyId()); + byte[] password = this.createPassword(identifier); + appAttemptSet.add(appAttemptId); + return new Token<AMRMTokenIdentifier>(identifier.getBytes(), + password, identifier.getKind(), new Text()); + } finally { + this.writeLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey. + @VisibleForTesting + public MasterKeyData getMasterKey() { + this.readLock.lock(); + try { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + /** + * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by + * RPC layer to validate a remote {@link AMRMTokenIdentifier}. + */ + @Override + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + + applicationAttemptId); + } + if (!appAttemptSet.contains(applicationAttemptId)) { + throw new InvalidToken(applicationAttemptId + + " not found in AMRMProxyTokenSecretManager."); + } + if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + } else if (nextMasterKey != null + && identifier.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + } + throw new InvalidToken("Invalid AMRMToken from " + + applicationAttemptId); + } finally { + this.readLock.unlock(); + } + } + + /** + * Creates an empty TokenId to be used for de-serializing an + * {@link AMRMTokenIdentifier} by the RPC layer. + */ + @Override + public AMRMTokenIdentifier createIdentifier() { + return new AMRMTokenIdentifier(); + } + + @Private + @VisibleForTesting + public MasterKeyData getNextMasterKeyData() { + this.readLock.lock(); + try { + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Override + @Private + protected byte[] createPassword(AMRMTokenIdentifier identifier) { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + LOG.info("Creating password for " + applicationAttemptId); + return createPassword(identifier.getBytes(), getMasterKey() + .getSecretKey()); + } finally { + this.readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java new file mode 100644 index 0000000..810dfa8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractRequestInterceptor implements + RequestInterceptor { + private Configuration conf; + private AMRMProxyApplicationContext appContext; + private RequestInterceptor nextInterceptor; + + /** + * Sets the {@link RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(RequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@link RequestInterceptor}. + */ + @Override + public void init(AMRMProxyApplicationContext appContext) { + Preconditions.checkState(this.appContext == null, + "init is called multiple times on this interceptor: " + + this.getClass().getName()); + this.appContext = appContext; + if (this.nextInterceptor != null) { + this.nextInterceptor.init(appContext); + } + } + + /** + * Disposes the {@link RequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link RequestInterceptor} in the chain. + */ + @Override + public RequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + + /** + * Gets the {@link AMRMProxyApplicationContext}. + */ + public AMRMProxyApplicationContext getApplicationContext() { + return this.appContext; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java new file mode 100644 index 0000000..2c7939b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the AbstractRequestInterceptor class and provides an implementation + * that simply forwards the AM requests to the cluster resource manager. + * + */ +public final class DefaultRequestInterceptor extends + AbstractRequestInterceptor { + private static final Logger LOG = LoggerFactory + .getLogger(DefaultRequestInterceptor.class); + private ApplicationMasterProtocol rmClient; + private UserGroupInformation user = null; + + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + try { + user = + UserGroupInformation.createProxyUser(appContext + .getApplicationAttemptId().toString(), UserGroupInformation + .getCurrentUser()); + user.addToken(appContext.getAMRMToken()); + final Configuration conf = this.getConf(); + + rmClient = + user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() { + @Override + public ApplicationMasterProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(conf, + ApplicationMasterProtocol.class); + } + }); + } catch (IOException e) { + String message = + "Error while creating of RM app master service proxy for attemptId:" + + appContext.getApplicationAttemptId().toString(); + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + final RegisterApplicationMasterRequest request) + throws YarnException, IOException { + LOG.info("Forwarding registration request to the real YARN RM"); + return rmClient.registerApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(final AllocateRequest request) + throws YarnException, IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the real YARN RM"); + } + AllocateResponse allocateResponse = rmClient.allocate(request); + if (allocateResponse.getAMRMToken() != null) { + updateAMRMToken(allocateResponse.getAMRMToken()); + } + + return allocateResponse; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + final FinishApplicationMasterRequest request) throws YarnException, + IOException { + LOG.info("Forwarding finish application request to " + + "the real YARN Resource Manager"); + return rmClient.finishApplicationMaster(request); + } + + @Override + public void setNextInterceptor(RequestInterceptor next) { + throw new YarnRuntimeException( + "setNextInterceptor is being called on DefaultRequestInterceptor," + + "which should be the last one in the chain " + + "Check if the interceptor pipeline configuration is correct"); + } + + private void updateAMRMToken(Token token) throws IOException { + org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = + new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>( + token.getIdentifier().array(), token.getPassword().array(), + new Text(token.getKind()), new Text(token.getService())); + // Preserve the token service sent by the RM when adding the token + // to ensure we replace the previous token setup by the RM. + // Afterwards we can update the service address for the RPC layer. + user.addToken(amrmToken); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf())); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java new file mode 100644 index 0000000..c74c88f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the application + * master to the resource manager. + */ +public interface RequestInterceptor extends ApplicationMasterProtocol, + Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param ctx + */ + void init(AMRMProxyApplicationContext ctx); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor + */ + void setNextInterceptor(RequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + RequestInterceptor getNextInterceptor(); + + /** + * Returns the context. + * + * @return the context + */ + AMRMProxyApplicationContext getApplicationContext(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 68c7f2c..a658e53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.NetUtils; @@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.SecretManager.InvalidToken; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; @@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; @@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements private boolean serviceStopped = false; private final ReadLock readLock; private final WriteLock writeLock; + private AMRMProxyService amrmProxyService; + private boolean amrmProxyEnabled = false; private long waitForContainersOnShutdownMillis; @@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements addService(sharedCacheUploader); dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader); + amrmProxyEnabled = + conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, + YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED); + + if (amrmProxyEnabled) { + LOG.info("AMRMProxyService is enabled. " + + "All the AM->RM requests will be intercepted by the proxy"); + this.amrmProxyService = + new AMRMProxyService(this.context, this.dispatcher); + addService(this.amrmProxyService); + } else { + LOG.info("AMRMProxyService is disabled"); + } + waitForContainersOnShutdownMillis = conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + @@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements recover(); } + public boolean isARMRMProxyEnabled() { + return amrmProxyEnabled; + } + @SuppressWarnings("unchecked") private void recover() throws IOException, URISyntaxException { NMStateStoreService stateStore = context.getNMStateStore(); @@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements + " with exit code " + rcs.getExitCode()); if (context.getApplications().containsKey(appId)) { - Credentials credentials = parseCredentials(launchContext); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), @@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements verifyAndGetContainerTokenIdentifier(request.getContainerToken(), containerTokenIdentifier); containerId = containerTokenIdentifier.getContainerID(); - startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, - request); + + // Initialize the AMRMProxy service instance only if the container is of + // type AM and if the AMRMProxy service is enabled + if (isARMRMProxyEnabled() + && containerTokenIdentifier.getContainerType().equals( + ContainerType.APPLICATION_MASTER)) { + this.amrmProxyService.processApplicationStartRequest(request); + } + + startContainerInternal(nmTokenIdentifier, + containerTokenIdentifier, request); succeededContainers.add(containerId); } catch (YarnException e) { failedContainers.put(containerId, SerializedException.newInstance(e)); @@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements } return StartContainersResponse.newInstance(getAuxServiceMetaData(), - succeededContainers, failedContainers); + succeededContainers, failedContainers); } private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, @@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements } } - Credentials credentials = parseCredentials(launchContext); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), this.dispatcher, @@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements nmTokenIdentifier); } - private Credentials parseCredentials(ContainerLaunchContext launchContext) - throws IOException { - Credentials credentials = new Credentials(); - // //////////// Parse credentials - ByteBuffer tokens = launchContext.getTokens(); - - if (tokens != null) { - DataInputByteBuffer buf = new DataInputByteBuffer(); - tokens.rewind(); - buf.reset(tokens); - credentials.readTokenStorageStream(buf); - if (LOG.isDebugEnabled()) { - for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) { - LOG.debug(tk.getService() + " = " + tk.toString()); - } - } - } - // //////////// End of parsing credentials - return credentials; - } - /** * Stop a list of containers running on this NodeManager. */
