Repository: hadoop Updated Branches: refs/heads/branch-2 60565976e -> 693792583
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 33617d4..78f6eb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Extends the FederationInterceptor and overrides methods to provide a testable * implementation of FederationInterceptor. */ public class TestableFederationInterceptor extends FederationInterceptor { + public static final Logger LOG = + LoggerFactory.getLogger(TestableFederationInterceptor.class); + private ConcurrentHashMap<String, MockResourceManagerFacade> secondaryResourceManagers = new ConcurrentHashMap<>(); private AtomicInteger runningIndex = new AtomicInteger(0); @@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends FederationInterceptor { return new TestableUnmanagedAMPoolManager(threadPool); } + @Override + protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + Configuration conf, ApplicationId appId) { + return new TestableAMRequestHandlerThread(conf, appId); + } + @SuppressWarnings("unchecked") @Override protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext, @@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends FederationInterceptor { return secondaryResourceManagers; } + protected MockResourceManagerFacade getSecondaryRM(String scId) { + return secondaryResourceManagers.get(scId); + } + + /** + * Drain all aysnc heartbeat threads, comes in two favors: + * + * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to + * pick up all pending heartbeat requests. Not necessarily wait for all + * threads to finish processing the last request. This is used to make sure + * all new UAM are launched by the async threads, but at the same time will + * finish draining while (slow) RM is still processing the last heartbeat + * request. + * + * 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish + * processing all heartbeat requests. + */ + protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish) + throws YarnException { + + LOG.info("waiting to drain home heartbeat handler"); + if (waitForAsyncHBThreadFinish) { + getHomeHeartbeartHandler().drainHeartbeatThread(); + } else { + while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + + LOG.info("waiting to drain UAM heartbeat handlers"); + UnmanagedAMPoolManager uamPool = getUnmanagedAMPool(); + if (waitForAsyncHBThreadFinish) { + getUnmanagedAMPool().drainUAMHeartbeats(); + } else { + while (true) { + boolean done = true; + for (String scId : uamPool.getAllUAMIds()) { + if (uamPool.getRequestQueueSize(scId) > 0) { + done = false; + break; + } + } + if (done) { + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + } + + protected UserGroupInformation getUGIWithToken( + ApplicationAttemptId appAttemptId) { + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1); + ugi.addTokenIdentifier(token); + return ugi; + } + /** * Extends the UnmanagedAMPoolManager and overrides methods to provide a * testable implementation of UnmanagedAMPoolManager. @@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends FederationInterceptor { String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { super(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, "TEST"); + setHandlerThread(new TestableAMRequestHandlerThread(conf, appId)); } /** @@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends FederationInterceptor { YarnConfiguration.getClusterId(config)); } } + + /** + * Wrap the handler thread so it calls from the same user. + */ + protected class TestableAMRequestHandlerThread + extends AMHeartbeatRequestHandler { + public TestableAMRequestHandlerThread(Configuration conf, + ApplicationId applicationId) { + super(conf, applicationId); + } + + @Override + public void run() { + try { + getUGIWithToken(getAttemptId()) + .doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() { + TestableAMRequestHandlerThread.super.run(); + return null; + } + }); + } catch (Exception e) { + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index b53e997..147b6b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -80,7 +80,6 @@ import com.google.common.annotations.VisibleForTesting; public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); - private static final int PRE_REGISTER_RESPONSE_ID = -1; private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; @@ -332,11 +331,6 @@ public class ApplicationMasterService extends AbstractService implements protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); - private int getNextResponseId(int responseId) { - // Loop between 0 to Integer.MAX_VALUE - return (responseId + 1) & Integer.MAX_VALUE; - } - @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { @@ -370,8 +364,8 @@ public class ApplicationMasterService extends AbstractService implements } // Normally request.getResponseId() == lastResponse.getResponseId() - if (getNextResponseId(request.getResponseId()) == lastResponse - .getResponseId()) { + if (AMRMClientUtils.getNextResponseId( + request.getResponseId()) == lastResponse.getResponseId()) { // heartbeat one step old, simply return lastReponse return lastResponse; } else if (request.getResponseId() != lastResponse.getResponseId()) { @@ -416,7 +410,8 @@ public class ApplicationMasterService extends AbstractService implements * need to worry about unregister call occurring in between (which * removes the lock object). */ - response.setResponseId(getNextResponseId(lastResponse.getResponseId())); + response.setResponseId( + AMRMClientUtils.getNextResponseId(lastResponse.getResponseId())); lock.setAllocateResponse(response); return response; } @@ -427,7 +422,7 @@ public class ApplicationMasterService extends AbstractService implements recordFactory.newRecordInstance(AllocateResponse.class); // set response id to -1 before application master for the following // attemptID get registered - response.setResponseId(PRE_REGISTER_RESPONSE_ID); + response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID); LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org