Repository: hadoop
Updated Branches:
  refs/heads/branch-2 60565976e -> 693792583


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index 33617d4..78f6eb0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -26,18 +27,26 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Extends the FederationInterceptor and overrides methods to provide a 
testable
  * implementation of FederationInterceptor.
  */
 public class TestableFederationInterceptor extends FederationInterceptor {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestableFederationInterceptor.class);
+
   private ConcurrentHashMap<String, MockResourceManagerFacade>
       secondaryResourceManagers = new ConcurrentHashMap<>();
   private AtomicInteger runningIndex = new AtomicInteger(0);
@@ -58,6 +67,12 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
     return new TestableUnmanagedAMPoolManager(threadPool);
   }
 
+  @Override
+  protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
+      Configuration conf, ApplicationId appId) {
+    return new TestableAMRequestHandlerThread(conf, appId);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
@@ -109,6 +124,71 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
     return secondaryResourceManagers;
   }
 
+  protected MockResourceManagerFacade getSecondaryRM(String scId) {
+    return secondaryResourceManagers.get(scId);
+  }
+
+  /**
+   * Drain all aysnc heartbeat threads, comes in two favors:
+   *
+   * 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
+   * pick up all pending heartbeat requests. Not necessarily wait for all
+   * threads to finish processing the last request. This is used to make sure
+   * all new UAM are launched by the async threads, but at the same time will
+   * finish draining while (slow) RM is still processing the last heartbeat
+   * request.
+   *
+   * 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
+   * processing all heartbeat requests.
+   */
+  protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
+      throws YarnException {
+
+    LOG.info("waiting to drain home heartbeat handler");
+    if (waitForAsyncHBThreadFinish) {
+      getHomeHeartbeartHandler().drainHeartbeatThread();
+    } else {
+      while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    LOG.info("waiting to drain UAM heartbeat handlers");
+    UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
+    if (waitForAsyncHBThreadFinish) {
+      getUnmanagedAMPool().drainUAMHeartbeats();
+    } else {
+      while (true) {
+        boolean done = true;
+        for (String scId : uamPool.getAllUAMIds()) {
+          if (uamPool.getRequestQueueSize(scId) > 0) {
+            done = false;
+            break;
+          }
+        }
+        if (done) {
+          break;
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+  }
+
+  protected UserGroupInformation getUGIWithToken(
+      ApplicationAttemptId appAttemptId) {
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return ugi;
+  }
+
   /**
    * Extends the UnmanagedAMPoolManager and overrides methods to provide a
    * testable implementation of UnmanagedAMPoolManager.
@@ -141,6 +221,7 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) 
{
       super(conf, appId, queueName, submitter, appNameSuffix,
           keepContainersAcrossApplicationAttempts, "TEST");
+      setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
     }
 
     /**
@@ -156,4 +237,30 @@ public class TestableFederationInterceptor extends 
FederationInterceptor {
           YarnConfiguration.getClusterId(config));
     }
   }
+
+  /**
+   * Wrap the handler thread so it calls from the same user.
+   */
+  protected class TestableAMRequestHandlerThread
+      extends AMHeartbeatRequestHandler {
+    public TestableAMRequestHandlerThread(Configuration conf,
+        ApplicationId applicationId) {
+      super(conf, applicationId);
+    }
+
+    @Override
+    public void run() {
+      try {
+        getUGIWithToken(getAttemptId())
+            .doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() {
+                TestableAMRequestHandlerThread.super.run();
+                return null;
+              }
+            });
+      } catch (Exception e) {
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index b53e997..147b6b4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -80,7 +80,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class ApplicationMasterService extends AbstractService implements
     ApplicationMasterProtocol {
   private static final Log LOG = 
LogFactory.getLog(ApplicationMasterService.class);
-  private static final int PRE_REGISTER_RESPONSE_ID = -1;
 
   private final AMLivelinessMonitor amLivelinessMonitor;
   private YarnScheduler rScheduler;
@@ -332,11 +331,6 @@ public class ApplicationMasterService extends 
AbstractService implements
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
       EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
 
-  private int getNextResponseId(int responseId) {
-    // Loop between 0 to Integer.MAX_VALUE
-    return (responseId + 1) & Integer.MAX_VALUE;
-  }
-
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
@@ -370,8 +364,8 @@ public class ApplicationMasterService extends 
AbstractService implements
       }
 
       // Normally request.getResponseId() == lastResponse.getResponseId()
-      if (getNextResponseId(request.getResponseId()) == lastResponse
-          .getResponseId()) {
+      if (AMRMClientUtils.getNextResponseId(
+          request.getResponseId()) == lastResponse.getResponseId()) {
         // heartbeat one step old, simply return lastReponse
         return lastResponse;
       } else if (request.getResponseId() != lastResponse.getResponseId()) {
@@ -416,7 +410,8 @@ public class ApplicationMasterService extends 
AbstractService implements
        * need to worry about unregister call occurring in between (which
        * removes the lock object).
        */
-      response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
+      response.setResponseId(
+          AMRMClientUtils.getNextResponseId(lastResponse.getResponseId()));
       lock.setAllocateResponse(response);
       return response;
     }
@@ -427,7 +422,7 @@ public class ApplicationMasterService extends 
AbstractService implements
         recordFactory.newRecordInstance(AllocateResponse.class);
     // set response id to -1 before application master for the following
     // attemptID get registered
-    response.setResponseId(PRE_REGISTER_RESPONSE_ID);
+    response.setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID);
     LOG.info("Registering app attempt : " + attemptId);
     responseMap.put(attemptId, new AllocateResponseLock(response));
     rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to