http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java new file mode 100644 index 0000000..964379a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -0,0 +1,677 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +/** + * Base class for all the AMRMProxyService test cases. It provides utility + * methods that can be used by the concrete test case classes + * + */ +public abstract class BaseAMRMProxyTest { + private static final Log LOG = LogFactory + .getLog(BaseAMRMProxyTest.class); + /** + * The AMRMProxyService instance that will be used by all the test cases + */ + private MockAMRMProxyService amrmProxyService; + /** + * Thread pool used for asynchronous operations + */ + private static ExecutorService threadpool = Executors + .newCachedThreadPool(); + private Configuration conf; + private AsyncDispatcher dispatcher; + + protected MockAMRMProxyService getAMRMProxyService() { + Assert.assertNotNull(this.amrmProxyService); + return this.amrmProxyService; + } + + @Before + public void setUp() { + this.conf = new YarnConfiguration(); + this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain will call the mock resource manager. The others in the chain will + // simply forward it to the next one in the chain + this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + + mockPassThroughInterceptorClass + "," + + mockPassThroughInterceptorClass + "," + + MockRequestInterceptor.class.getName()); + + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(conf); + this.dispatcher.start(); + this.amrmProxyService = createAndStartAMRMProxyService(); + } + + @After + public void tearDown() { + amrmProxyService.stop(); + amrmProxyService = null; + this.dispatcher.stop(); + } + + protected ExecutorService getThreadPool() { + return threadpool; + } + + protected MockAMRMProxyService createAndStartAMRMProxyService() { + MockAMRMProxyService svc = + new MockAMRMProxyService(new NullContext(), dispatcher); + svc.init(conf); + svc.start(); + return svc; + } + + /** + * This helper method will invoke the specified function in parallel for each + * end point in the specified list using a thread pool and return the + * responses received from the function. It implements the logic required for + * dispatching requests in parallel and waiting for the responses. If any of + * the function call fails or times out, it will ignore and proceed with the + * rest. So the responses returned can be less than the number of end points + * specified + * + * @param testContext + * @param func + * @return + */ + protected <T, R> List<R> runInParallel(List<T> testContexts, + final Function<T, R> func) { + ExecutorCompletionService<R> completionService = + new ExecutorCompletionService<R>(this.getThreadPool()); + LOG.info("Sending requests to endpoints asynchronously. Number of test contexts=" + + testContexts.size()); + for (int index = 0; index < testContexts.size(); index++) { + final T testContext = testContexts.get(index); + + LOG.info("Adding request to threadpool for test context: " + + testContext.toString()); + + completionService.submit(new Callable<R>() { + @Override + public R call() throws Exception { + LOG.info("Sending request. Test context:" + + testContext.toString()); + + R response = null; + try { + response = func.invoke(testContext); + LOG.info("Successfully sent request for context: " + + testContext.toString()); + } catch (Throwable ex) { + LOG.error("Failed to process request for context: " + + testContext); + response = null; + } + + return response; + } + }); + } + + ArrayList<R> responseList = new ArrayList<R>(); + LOG.info("Waiting for responses from endpoints. Number of contexts=" + + testContexts.size()); + for (int i = 0; i < testContexts.size(); ++i) { + try { + final Future<R> future = completionService.take(); + final R response = future.get(3000, TimeUnit.MILLISECONDS); + responseList.add(response); + } catch (Throwable e) { + LOG.error("Failed to process request " + e.getMessage()); + } + } + + return responseList; + } + + /** + * Helper method to register an application master using specified testAppId + * as the application identifier and return the response + * + * @param testAppId + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected RegisterApplicationMasterResponse registerApplicationMaster( + final int testAppId) throws Exception, YarnException, IOException { + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi + .getUser() + .doAs( + new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() { + @Override + public RegisterApplicationMasterResponse run() + throws Exception { + getAMRMProxyService().initApp( + ugi.getAppAttemptId(), + ugi.getUser().getUserName()); + + final RegisterApplicationMasterRequest req = + Records + .newRecord(RegisterApplicationMasterRequest.class); + req.setHost(Integer.toString(testAppId)); + req.setRpcPort(testAppId); + req.setTrackingUrl(""); + + RegisterApplicationMasterResponse response = + getAMRMProxyService().registerApplicationMaster(req); + return response; + } + }); + } + + /** + * Helper method that can be used to register multiple application masters in + * parallel to the specified RM end points + * + * @param testContexts - used to identify the requests + * @return + */ + protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel( + final ArrayList<T> testContexts) { + List<RegisterApplicationMasterResponseInfo<T>> responses = + runInParallel(testContexts, + new Function<T, RegisterApplicationMasterResponseInfo<T>>() { + @Override + public RegisterApplicationMasterResponseInfo<T> invoke( + T testContext) { + RegisterApplicationMasterResponseInfo<T> response = null; + try { + int index = testContexts.indexOf(testContext); + response = + new RegisterApplicationMasterResponseInfo<T>( + registerApplicationMaster(index), testContext); + Assert.assertNotNull(response.getResponse()); + Assert.assertEquals(Integer.toString(index), response + .getResponse().getQueue()); + + LOG.info("Sucessfully registered application master with test context: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to register application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set<T> contextResponses = new TreeSet<T>(); + for (RegisterApplicationMasterResponseInfo<T> item : responses) { + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + /** + * Unregisters the application master for specified application id + * + * @param appId + * @param status + * @return + * @throws Exception + * @throws YarnException + * @throws IOException + */ + protected FinishApplicationMasterResponse finishApplicationMaster( + final int appId, final FinalApplicationStatus status) + throws Exception, YarnException, IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(appId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction<FinishApplicationMasterResponse>() { + @Override + public FinishApplicationMasterResponse run() throws Exception { + final FinishApplicationMasterRequest req = + Records.newRecord(FinishApplicationMasterRequest.class); + req.setDiagnostics(""); + req.setTrackingUrl(""); + req.setFinalApplicationStatus(status); + + FinishApplicationMasterResponse response = + getAMRMProxyService().finishApplicationMaster(req); + + getAMRMProxyService().stopApp( + ugi.getAppAttemptId().getApplicationId()); + + return response; + } + }); + } + + protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel( + final ArrayList<T> testContexts) { + List<FinishApplicationMasterResponseInfo<T>> responses = + runInParallel(testContexts, + new Function<T, FinishApplicationMasterResponseInfo<T>>() { + @Override + public FinishApplicationMasterResponseInfo<T> invoke( + T testContext) { + FinishApplicationMasterResponseInfo<T> response = null; + try { + response = + new FinishApplicationMasterResponseInfo<T>( + finishApplicationMaster( + testContexts.indexOf(testContext), + FinalApplicationStatus.SUCCEEDED), + testContext); + Assert.assertNotNull(response.getResponse()); + + LOG.info("Sucessfully finished application master with test contexts: " + + testContext); + } catch (Throwable ex) { + response = null; + LOG.error("Failed to finish application master with test context: " + + testContext); + } + + return response; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + testContexts.size(), responses.size()); + + Set<T> contextResponses = new TreeSet<T>(); + for (FinishApplicationMasterResponseInfo<T> item : responses) { + Assert.assertNotNull(item); + Assert.assertNotNull(item.getResponse()); + contextResponses.add(item.getTestContext()); + } + + for (T ep : testContexts) { + Assert.assertTrue(contextResponses.contains(ep)); + } + + return responses; + } + + protected AllocateResponse allocate(final int testAppId) + throws Exception, YarnException, IOException { + final AllocateRequest req = Records.newRecord(AllocateRequest.class); + req.setResponseId(testAppId); + return allocate(testAppId, req); + } + + protected AllocateResponse allocate(final int testAppId, + final AllocateRequest request) throws Exception, YarnException, + IOException { + + final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId); + + return ugi.getUser().doAs( + new PrivilegedExceptionAction<AllocateResponse>() { + @Override + public AllocateResponse run() throws Exception { + AllocateResponse response = + getAMRMProxyService().allocate(request); + return response; + } + }); + } + + protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) { + final ApplicationAttemptId attemptId = + getApplicationAttemptId(testAppId); + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(attemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1); + ugi.addTokenIdentifier(token); + return new ApplicationUserInfo(ugi, attemptId); + } + + protected List<ResourceRequest> createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers) + throws Exception { + return createResourceRequests(hosts, memory, vCores, priority, + containers, null); + } + + protected List<ResourceRequest> createResourceRequests(String[] hosts, + int memory, int vCores, int priority, int containers, + String labelExpression) throws Exception { + List<ResourceRequest> reqs = new ArrayList<ResourceRequest>(); + for (String host : hosts) { + ResourceRequest hostReq = + createResourceRequest(host, memory, vCores, priority, + containers, labelExpression); + reqs.add(hostReq); + ResourceRequest rackReq = + createResourceRequest("/default-rack", memory, vCores, priority, + containers, labelExpression); + reqs.add(rackReq); + } + + ResourceRequest offRackReq = + createResourceRequest(ResourceRequest.ANY, memory, vCores, + priority, containers, labelExpression); + reqs.add(offRackReq); + return reqs; + } + + protected ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers) + throws Exception { + return createResourceRequest(resource, memory, vCores, priority, + containers, null); + } + + protected ResourceRequest createResourceRequest(String resource, + int memory, int vCores, int priority, int containers, + String labelExpression) throws Exception { + ResourceRequest req = Records.newRecord(ResourceRequest.class); + req.setResourceName(resource); + req.setNumContainers(containers); + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + req.setPriority(pri); + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memory); + capability.setVirtualCores(vCores); + req.setCapability(capability); + if (labelExpression != null) { + req.setNodeLabelExpression(labelExpression); + } + return req; + } + + /** + * Returns an ApplicationId with the specified identifier + * + * @param testAppId + * @return + */ + protected ApplicationId getApplicationId(int testAppId) { + return ApplicationId.newInstance(123456, testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier. This + * identifier will be used for the ApplicationId too. + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId) { + return ApplicationAttemptId.newInstance(getApplicationId(testAppId), + testAppId); + } + + /** + * Return an instance of ApplicationAttemptId using specified identifier and + * application id + * + * @param testAppId + * @return + */ + protected ApplicationAttemptId getApplicationAttemptId(int testAppId, + ApplicationId appId) { + return ApplicationAttemptId.newInstance(appId, testAppId); + } + + protected static class RegisterApplicationMasterResponseInfo<T> { + private RegisterApplicationMasterResponse response; + private T testContext; + + RegisterApplicationMasterResponseInfo( + RegisterApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public RegisterApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class FinishApplicationMasterResponseInfo<T> { + private FinishApplicationMasterResponse response; + private T testContext; + + FinishApplicationMasterResponseInfo( + FinishApplicationMasterResponse response, T testContext) { + this.response = response; + this.testContext = testContext; + } + + public FinishApplicationMasterResponse getResponse() { + return response; + } + + public T getTestContext() { + return testContext; + } + } + + protected static class ApplicationUserInfo { + private UserGroupInformation user; + private ApplicationAttemptId attemptId; + + ApplicationUserInfo(UserGroupInformation user, + ApplicationAttemptId attemptId) { + this.user = user; + this.attemptId = attemptId; + } + + public UserGroupInformation getUser() { + return this.user; + } + + public ApplicationAttemptId getAppAttemptId() { + return this.attemptId; + } + } + + protected static class MockAMRMProxyService extends AMRMProxyService { + public MockAMRMProxyService(Context nmContext, + AsyncDispatcher dispatcher) { + super(nmContext, dispatcher); + } + + /** + * This method is used by the test code to initialize the pipeline. In the + * actual service, the initialization is called by the + * ContainerManagerImpl::StartContainers method + * + * @param applicationId + * @param user + */ + public void initApp(ApplicationAttemptId applicationId, String user) { + super.initializePipeline(applicationId, user, null, null); + } + + public void stopApp(ApplicationId applicationId) { + super.stopApplication(applicationId); + } + } + + /** + * The Function interface is used for passing method pointers that can be + * invoked asynchronously at a later point. + */ + protected interface Function<T, R> { + public R invoke(T input); + } + + protected class NullContext implements Context { + + @Override + public NodeId getNodeId() { + return null; + } + + @Override + public int getHttpPort() { + return 0; + } + + @Override + public ConcurrentMap<ApplicationId, Application> getApplications() { + return null; + } + + @Override + public Map<ApplicationId, Credentials> getSystemCredentialsForApps() { + return null; + } + + @Override + public ConcurrentMap<ContainerId, Container> getContainers() { + return null; + } + + @Override + public NMContainerTokenSecretManager getContainerTokenSecretManager() { + return null; + } + + @Override + public NMTokenSecretManagerInNM getNMTokenSecretManager() { + return null; + } + + @Override + public NodeHealthStatus getNodeHealthStatus() { + return null; + } + + @Override + public ContainerManagementProtocol getContainerManager() { + return null; + } + + @Override + public LocalDirsHandlerService getLocalDirsHandler() { + return null; + } + + @Override + public ApplicationACLsManager getApplicationACLsManager() { + return null; + } + + @Override + public NMStateStoreService getNMStateStore() { + return null; + } + + @Override + public boolean getDecommissioned() { + return false; + } + + @Override + public void setDecommissioned(boolean isDecommissioned) { + } + + @Override + public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() { + return null; + } + + @Override + public NodeResourceMonitor getNodeResourceMonitor() { + return null; + } + + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java new file mode 100644 index 0000000..c962f97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class MockRequestInterceptor extends AbstractRequestInterceptor { + + private MockResourceManagerFacade mockRM; + + public MockRequestInterceptor() { + } + + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + mockRM = + new MockResourceManagerFacade(new YarnConfiguration( + super.getConf()), 0); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return mockRM.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return mockRM.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return mockRM.allocate(request); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java new file mode 100644 index 0000000..7573a7a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.directory.api.util.Strings; +import org.apache.directory.api.util.exception.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.mortbay.log.Log; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the unit test cases. So please change the + * implementation with care. + */ +public class MockResourceManagerFacade implements + ApplicationMasterProtocol, ApplicationClientProtocol { + + private HashMap<String, List<ContainerId>> applicationContainerIdMap = + new HashMap<String, List<ContainerId>>(); + private HashMap<ContainerId, Container> allocatedContainerMap = + new HashMap<ContainerId, Container>(); + private AtomicInteger containerIndex = new AtomicInteger(0); + private Configuration conf; + + public MockResourceManagerFacade(Configuration conf, + int startContainerIndex) { + this.conf = conf; + this.containerIndex.set(startContainerIndex); + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() + : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + String amrmToken = getAppIdentifier(); + Log.info("Registering application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + Assert.assertFalse("The application id is already registered: " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(amrmToken, + new ArrayList<ContainerId>()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, + null, null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + String amrmToken = getAppIdentifier(); + Log.info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this application + Assert.assertTrue("The application id is NOT registered: " + + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse + .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true + : false); + } + + protected ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(12345, id); + } + + protected ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } + + @SuppressWarnings("deprecation") + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (request.getAskList() != null && request.getAskList().size() > 0 + && request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Assert.fail("The mock RM implementation does not support receiving " + + "askList and releaseList in the same heartbeat"); + } + + String amrmToken = getAppIdentifier(); + + ArrayList<Container> containerList = new ArrayList<Container>(); + if (request.getAskList() != null) { + for (ResourceRequest rr : request.getAskList()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + ContainerId containerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container container = Records.newRecord(Container.class); + container.setId(containerId); + container.setPriority(rr.getPriority()); + + // We don't use the node for running containers in the test cases. So + // it is OK to hard code it to some dummy value + NodeId nodeId = + NodeId.newInstance( + !Strings.isEmpty(rr.getResourceName()) ? rr + .getResourceName() : "dummy", 1000); + container.setNodeId(nodeId); + container.setResource(rr.getCapability()); + containerList.add(container); + + synchronized (applicationContainerIdMap) { + // Keep track of the containers returned to this application. We + // will need it in future + Assert.assertTrue( + "The application id is Not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = + applicationContainerIdMap.get(amrmToken); + ids.add(containerId); + this.allocatedContainerMap.put(containerId, container); + } + } + } + } + + if (request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Log.info("Releasing containers: " + request.getReleaseList().size()); + synchronized (applicationContainerIdMap) { + Assert.assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); + + for (ContainerId id : request.getReleaseList()) { + boolean found = false; + for (ContainerId c : ids) { + if (c.equals(id)) { + found = true; + break; + } + } + + Assert.assertTrue( + "ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); + + ids.remove(id); + + // Return the released container back to the AM with new fake Ids. The + // test case does not care about the IDs. The IDs are faked because + // otherwise the LRM will throw duplication identifier exception. This + // returning of fake containers is ONLY done for testing purpose - for + // the test code to get confirmation that the sub-cluster resource + // managers received the release request + ContainerId fakeContainerId = + ContainerId.newInstance(getApplicationAttemptId(1), + containerIndex.incrementAndGet()); + Container fakeContainer = allocatedContainerMap.get(id); + fakeContainer.setId(fakeContainerId); + containerList.add(fakeContainer); + } + } + } + + Log.info("Allocating containers: " + containerList.size() + + " for application attempt: " + conf.get("AMRMTOKEN")); + return AllocateResponse.newInstance(0, + new ArrayList<ContainerStatus>(), containerList, + new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, + new ArrayList<NMToken>(), + new ArrayList<ContainerResourceIncrease>(), + new ArrayList<ContainerResourceDecrease>()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, + IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId(ApplicationAttemptId + .newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) throws YarnException, + IOException { + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report + .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return null; + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationsResponse getApplications( + GetApplicationsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodesResponse getClusterNodes( + GetClusterNodesRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, + IOException { + throw new NotImplementedException(); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return null; + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) throws YarnException, + IOException { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java new file mode 100644 index 0000000..97a844e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Mock intercepter that does not do anything other than forwarding it to the + * next intercepter in the chain + * + */ +public class PassThroughRequestInterceptor extends + AbstractRequestInterceptor { + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return getNextInterceptor().allocate(request); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java new file mode 100644 index 0000000..69b913a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.junit.Test; + +public class TestAMRMProxyService extends BaseAMRMProxyTest { + + private static final Log LOG = LogFactory + .getLog(TestAMRMProxyService.class); + + /** + * Test if the pipeline is created properly. + */ + @Test + public void testRequestInterceptorChainCreation() throws Exception { + RequestInterceptor root = + super.getAMRMProxyService().createRequestInterceptorChain(); + int index = 0; + while (root != null) { + switch (index) { + case 0: + case 1: + case 2: + Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), + root.getClass().getName()); + break; + case 3: + Assert.assertEquals(MockRequestInterceptor.class.getName(), root + .getClass().getName()); + break; + } + + root = root.getNextInterceptor(); + index++; + } + + Assert.assertEquals( + "The number of interceptors in chain does not match", + Integer.toString(4), Integer.toString(index)); + + } + + /** + * Tests registration of a single application master. + * + * @throws Exception + */ + @Test + public void testRegisterOneApplicationMaster() throws Exception { + // The testAppId identifier is used as host name and the mock resource + // manager return it as the queue name. Assert that we received the queue + // name + int testAppId = 1; + RegisterApplicationMasterResponse response1 = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response1); + Assert.assertEquals(Integer.toString(testAppId), response1.getQueue()); + } + + /** + * Tests the registration of multiple application master serially one at a + * time. + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMasters() throws Exception { + for (int testAppId = 0; testAppId < 3; testAppId++) { + RegisterApplicationMasterResponse response = + registerApplicationMaster(testAppId); + Assert.assertNotNull(response); + Assert + .assertEquals(Integer.toString(testAppId), response.getQueue()); + } + } + + /** + * Tests the registration of multiple application masters using multiple + * threads in parallel. + * + * @throws Exception + */ + @Test + public void testRegisterMulitpleApplicationMastersInParallel() + throws Exception { + int numberOfRequests = 5; + ArrayList<String> testContexts = + CreateTestRequestIdentifiers(numberOfRequests); + super.registerApplicationMastersInParallel(testContexts); + } + + private ArrayList<String> CreateTestRequestIdentifiers( + int numberOfRequests) { + ArrayList<String> testContexts = new ArrayList<String>(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int ep = 0; ep < numberOfRequests; ep++) { + testContexts.add("test-endpoint-" + Integer.toString(ep)); + LOG.info("Created test context: " + testContexts.get(ep)); + } + return testContexts; + } + + @Test + public void testFinishOneApplicationMasterWithSuccess() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, + FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testFinishOneApplicationMasterWithFailure() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(false, finshResponse.getIsUnregistered()); + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishInvalidApplicationMaster() throws Exception { + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMasters() throws Exception { + int numberOfRequests = 3; + for (int index = 0; index < numberOfRequests; index++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(index); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(index), + registerResponse.getQueue()); + } + + // Finish in reverse sequence + for (int index = numberOfRequests - 1; index >= 0; index--) { + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + // Assert that the application has been removed from the collection + Assert.assertTrue(this.getAMRMProxyService() + .getPipelines().size() == index); + } + + try { + // Try to finish an application master that is already finished. + finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + + try { + // Try to finish an application master that was not registered. + finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The request to finish application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("Finish registration failed as expected because it was not registered"); + } + } + + @Test + public void testFinishMulitpleApplicationMastersInParallel() + throws Exception { + int numberOfRequests = 5; + ArrayList<String> testContexts = new ArrayList<String>(); + LOG.info("Creating " + numberOfRequests + " contexts for testing"); + for (int i = 0; i < numberOfRequests; i++) { + testContexts.add("test-endpoint-" + Integer.toString(i)); + LOG.info("Created test context: " + testContexts.get(i)); + + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(i); + Assert.assertNotNull(registerResponse); + Assert + .assertEquals(Integer.toString(i), registerResponse.getQueue()); + } + + finishApplicationMastersInParallel(testContexts); + } + + @Test + public void testAllocateRequestWithNullValues() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId), + registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId, + FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + } + + @Test + public void testAllocateRequestWithoutRegistering() throws Exception { + + try { + // Try to allocate an application master without registering. + allocate(1); + Assert + .fail("The request to allocate application master should have failed"); + } catch (Throwable ex) { + // This is expected. So nothing required here. + LOG.info("AllocateRequest failed as expected because AM was not registered"); + } + } + + @Test + public void testAllocateWithOneResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 1); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateWithMultipleResourceRequest() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + getContainersAndAssert(testAppId, 10); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainers() throws Exception { + int testAppId = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List<Container> containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAM() + throws Exception { + int numberOfApps = 5; + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull(registerResponse); + List<Container> containers = getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + } + for (int testAppId = 0; testAppId < numberOfApps; testAppId++) { + finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED); + } + } + + @Test + public void testAllocateAndReleaseContainersForMultipleAMInParallel() + throws Exception { + int numberOfApps = 6; + ArrayList<Integer> tempAppIds = new ArrayList<Integer>(); + for (int i = 0; i < numberOfApps; i++) { + tempAppIds.add(new Integer(i)); + } + + final ArrayList<Integer> appIds = tempAppIds; + List<Integer> responses = + runInParallel(appIds, new Function<Integer, Integer>() { + @Override + public Integer invoke(Integer testAppId) { + try { + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId); + Assert.assertNotNull("response is null", registerResponse); + List<Container> containers = + getContainersAndAssert(testAppId, 10); + releaseContainersAndAssert(testAppId, containers); + + LOG.info("Sucessfully registered application master with appId: " + + testAppId); + } catch (Throwable ex) { + LOG.error( + "Failed to register application master with appId: " + + testAppId, ex); + testAppId = null; + } + + return testAppId; + } + }); + + Assert.assertEquals( + "Number of responses received does not match with request", + appIds.size(), responses.size()); + + for (Integer testAppId : responses) { + Assert.assertNotNull(testAppId); + finishApplicationMaster(testAppId.intValue(), + FinalApplicationStatus.SUCCEEDED); + } + } + + private List<Container> getContainersAndAssert(int appId, + int numberOfResourceRequests) throws Exception { + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List<Container> containers = + new ArrayList<Container>(numberOfResourceRequests); + List<ResourceRequest> askList = + new ArrayList<ResourceRequest>(numberOfResourceRequests); + for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) { + askList.add(createResourceRequest( + "test-node-" + Integer.toString(testAppId), 6000, 2, + testAppId % 5, 1)); + } + + allocateRequest.setAskList(askList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containers.size() < askList.size() && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull("allocate() returned null response", + allocateResponse); + + containers.addAll(allocateResponse.getAllocatedContainers()); + + LOG.info("Number of allocated containers in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers() + .size())); + LOG.info("Total number of allocated containers: " + + Integer.toString(containers.size())); + Thread.sleep(10); + } + + // We broadcast the request, the number of containers we received will be + // higher than we ask + Assert.assertTrue("The asklist count is not same as response", + askList.size() <= containers.size()); + return containers; + } + + private void releaseContainersAndAssert(int appId, + List<Container> containers) throws Exception { + Assert.assertTrue(containers.size() > 0); + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(1); + + List<ContainerId> relList = + new ArrayList<ContainerId>(containers.size()); + for (Container container : containers) { + relList.add(container.getId()); + } + + allocateRequest.setReleaseList(relList); + + AllocateResponse allocateResponse = allocate(appId, allocateRequest); + Assert.assertNotNull(allocateResponse); + + // The way the mock resource manager is setup, it will return the containers + // that were released in the response. This is done because the UAMs run + // asynchronously and we need to if all the resource managers received the + // release it. The containers sent by the mock resource managers will be + // aggregated and returned back to us and we can assert if all the release + // lists reached the sub-clusters + List<Container> containersForReleasedContainerIds = + new ArrayList<Container>(); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + // Send max 10 heart beats to receive all the containers. If not, we will + // fail the test + int numHeartbeat = 0; + while (containersForReleasedContainerIds.size() < relList.size() + && numHeartbeat++ < 10) { + allocateResponse = + allocate(appId, Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(allocateResponse); + containersForReleasedContainerIds.addAll(allocateResponse + .getAllocatedContainers()); + + LOG.info("Number of containers received in this request: " + + Integer.toString(allocateResponse.getAllocatedContainers() + .size())); + LOG.info("Total number of containers received: " + + Integer.toString(containersForReleasedContainerIds.size())); + Thread.sleep(10); + } + + Assert.assertEquals(relList.size(), + containersForReleasedContainerIds.size()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index c8b985d..14142de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements return this.masterServiceAddress; } - // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer - // currently sets only the required id, but iterate through anyways just to be - // sure. - private AMRMTokenIdentifier selectAMRMTokenIdentifier( - UserGroupInformation remoteUgi) throws IOException { - AMRMTokenIdentifier result = null; - Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); - for (TokenIdentifier tokenId : tokenIds) { - if (tokenId instanceof AMRMTokenIdentifier) { - result = (AMRMTokenIdentifier) tokenId; - break; - } - } - - return result; - } - - private AMRMTokenIdentifier authorizeRequest() - throws YarnException { - - UserGroupInformation remoteUgi; - try { - remoteUgi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - String msg = - "Cannot obtain the user-name for authorizing ApplicationMaster. " - + "Got exception: " + StringUtils.stringifyException(e); - LOG.warn(msg); - throw RPCUtil.getRemoteException(msg); - } - - boolean tokenFound = false; - String message = ""; - AMRMTokenIdentifier appTokenIdentifier = null; - try { - appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi); - if (appTokenIdentifier == null) { - tokenFound = false; - message = "No AMRMToken found for user " + remoteUgi.getUserName(); - } else { - tokenFound = true; - } - } catch (IOException e) { - tokenFound = false; - message = - "Got exception while looking for AMRMToken for user " - + remoteUgi.getUserName(); - } - - if (!tokenFound) { - LOG.warn(message); - throw RPCUtil.getRemoteException(message); - } - - return appTokenIdentifier; - } - @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); ApplicationAttemptId applicationAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); @@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements IOException { ApplicationAttemptId applicationAttemptId = - authorizeRequest().getApplicationAttemptId(); + YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId(); ApplicationId appId = applicationAttemptId.getApplicationId(); RMApp rmApp = @@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = + YarnServerSecurityUtils.authorizeRequest(); ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId();
