Repository: hadoop Updated Branches: refs/heads/branch-2 fa4a11103 -> b0900ad31 refs/heads/trunk a23ea68b9 -> c3d22d3b4
YARN-7652. Handle AM register requests asynchronously in FederationInterceptor. Contributed by Botong Huang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c3d22d3b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c3d22d3b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c3d22d3b Branch: refs/heads/trunk Commit: c3d22d3b4569b7f87af4ee4abfcc284deebe90de Parents: a23ea68 Author: Inigo Goiri <[email protected]> Authored: Tue Oct 9 10:29:40 2018 -0700 Committer: Inigo Goiri <[email protected]> Committed: Tue Oct 9 10:29:40 2018 -0700 ---------------------------------------------------------------------- .../yarn/server/AMHeartbeatRequestHandler.java | 10 +- .../hadoop/yarn/server/AMRMClientRelayer.java | 6 +- .../server/uam/UnmanagedApplicationManager.java | 20 +- .../yarn/server/MockResourceManagerFacade.java | 29 +- .../uam/TestUnmanagedApplicationManager.java | 51 +++- .../amrmproxy/FederationInterceptor.java | 291 +++++++------------ .../amrmproxy/TestFederationInterceptor.java | 16 +- .../TestableFederationInterceptor.java | 26 +- 8 files changed, 217 insertions(+), 232 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java index 380c216..1534354 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java @@ -59,7 +59,7 @@ public class AMHeartbeatRequestHandler extends Thread { private int lastResponseId; public AMHeartbeatRequestHandler(Configuration conf, - ApplicationId applicationId) { + ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) { super("AMHeartbeatRequestHandler Heartbeat Handler Thread"); this.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); @@ -69,6 +69,7 @@ public class AMHeartbeatRequestHandler extends Thread { this.conf = conf; this.applicationId = applicationId; this.requestQueue = new LinkedBlockingQueue<>(); + this.rmProxyRelayer = rmProxyRelayer; resetLastResponseId(); } @@ -157,13 +158,6 @@ public class AMHeartbeatRequestHandler extends Thread { } /** - * Set the AMRMClientRelayer for RM connection. - */ - public void setAMRMClientRelayer(AMRMClientRelayer relayer) { - this.rmProxyRelayer = relayer; - } - - /** * Set the UGI for RM connection. */ public void setUGI(UserGroupInformation ugi) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index ca045d1..dc66868 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -186,7 +186,11 @@ public class AMRMClientRelayer extends AbstractService this.amRegistrationRequest = registerRequest; } - public void setRMClient(ApplicationMasterProtocol client){ + public String getRMIdentifier() { + return this.rmId; + } + + public void setRMClient(ApplicationMasterProtocol client) { this.rmClient = client; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 91d5d6c..134df1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -134,10 +134,12 @@ public class UnmanagedApplicationManager { this.submitter = submitter; this.appNameSuffix = appNameSuffix; this.userUgi = null; - this.heartbeatHandler = - new AMHeartbeatRequestHandler(this.conf, this.applicationId); + // Relayer's rmClient will be set after the RM connection is created this.rmProxyRelayer = new AMRMClientRelayer(null, this.applicationId, rmName); + this.heartbeatHandler = createAMHeartbeatRequestHandler(this.conf, + this.applicationId, this.rmProxyRelayer); + this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); @@ -150,6 +152,13 @@ public class UnmanagedApplicationManager { keepContainersAcrossApplicationAttempts; } + @VisibleForTesting + protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler( + Configuration config, ApplicationId appId, + AMRMClientRelayer relayer) { + return new AMHeartbeatRequestHandler(config, appId, relayer); + } + /** * Launch a new UAM in the resource manager. * @@ -191,8 +200,6 @@ public class UnmanagedApplicationManager { this.applicationId.toString(), UserGroupInformation.getCurrentUser()); this.rmProxyRelayer.setRMClient(createRMProxy( ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken)); - - this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer); this.heartbeatHandler.setUGI(this.userUgi); } @@ -522,11 +529,6 @@ public class UnmanagedApplicationManager { } @VisibleForTesting - protected void setHandlerThread(AMHeartbeatRequestHandler thread) { - this.heartbeatHandler = thread; - } - - @VisibleForTesting protected void drainHeartbeatThread() { this.heartbeatHandler.drainHeartbeatThread(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 50a4bff..60c2ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -207,10 +207,15 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, private boolean shouldWaitForSyncNextAllocate = false; // For unit test synchronization - private static Object syncObj = new Object(); + private static Object registerSyncObj = new Object(); + private static Object allocateSyncObj = new Object(); - public static Object getSyncObj() { - return syncObj; + public static Object getRegisterSyncObj() { + return registerSyncObj; + } + + public static Object getAllocateSyncObj() { + return allocateSyncObj; } public MockResourceManagerFacade(Configuration conf, @@ -290,14 +295,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, shouldReRegisterNext = false; // Make sure we wait for certain test cases last in the method - synchronized (syncObj) { - syncObj.notifyAll(); + synchronized (registerSyncObj) { + registerSyncObj.notifyAll(); // We reuse the port number to indicate whether the unit test want us to // wait here if (request.getRpcPort() > 1000) { LOG.info("Register call in RM start waiting"); try { - syncObj.wait(); + registerSyncObj.wait(); LOG.info("Register call in RM wait finished"); } catch (InterruptedException e) { LOG.info("Register call in RM wait interrupted", e); @@ -364,13 +369,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } // Wait for signal for certain test cases - synchronized (syncObj) { + synchronized (allocateSyncObj) { if (shouldWaitForSyncNextAllocate) { shouldWaitForSyncNextAllocate = false; LOG.info("Allocate call in RM start waiting"); try { - syncObj.wait(); + allocateSyncObj.wait(); LOG.info("Allocate call in RM wait finished"); } catch (InterruptedException e) { LOG.info("Allocate call in RM wait interrupted", e); @@ -431,8 +436,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } Assert.assertTrue("ContainerId " + id - + " being released is not valid for application: " - + conf.get("AMRMTOKEN"), found); + + " being released is not valid for application: " + attemptId, + found); ids.remove(id); completedList.add( @@ -442,7 +447,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } LOG.info("Allocating containers: " + containerList.size() - + " for application attempt: " + conf.get("AMRMTOKEN")); + + " for application attempt: " + attemptId); // Always issue a new AMRMToken as if RM rolled master key Token newAMRMToken = Token.newInstance(new byte[0], @@ -455,7 +460,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } public void setWaitForSyncNextAllocate(boolean wait) { - synchronized (syncObj) { + synchronized (allocateSyncObj) { shouldWaitForSyncNextAllocate = wait; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java index b6bb0da..54e7dd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.util.AsyncCallback; import org.junit.Assert; @@ -65,7 +67,8 @@ public class TestUnmanagedApplicationManager { ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, + "rm"); } protected void waitForCallBackCountAndCheckZeroPending( @@ -121,7 +124,8 @@ public class TestUnmanagedApplicationManager { MockResourceManagerFacade rmProxy = uam.getRMProxy(); uam = new TestableUnmanagedApplicationManager(conf, - attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true); + attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, + "rm"); uam.setRMProxy(rmProxy); reAttachUAM(null, attemptId); @@ -186,7 +190,7 @@ public class TestUnmanagedApplicationManager { }); // Sync obj from mock RM - Object syncObj = MockResourceManagerFacade.getSyncObj(); + Object syncObj = MockResourceManagerFacade.getRegisterSyncObj(); // Wait for register call in the thread get into RM and then wake us synchronized (syncObj) { @@ -365,16 +369,24 @@ public class TestUnmanagedApplicationManager { /** * Testable UnmanagedApplicationManager that talks to a mock RM. */ - public static class TestableUnmanagedApplicationManager + public class TestableUnmanagedApplicationManager extends UnmanagedApplicationManager { private MockResourceManagerFacade rmProxy; public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmName) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts, "TEST"); + keepContainersAcrossApplicationAttempts, rmName); + } + + @Override + protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler( + Configuration config, ApplicationId appId, + AMRMClientRelayer rmProxyRelayer) { + return new TestableAMRequestHandlerThread(config, appId, rmProxyRelayer); } @SuppressWarnings("unchecked") @@ -402,4 +414,31 @@ public class TestUnmanagedApplicationManager { } } + /** + * Wrap the handler thread so it calls from the same user. + */ + public class TestableAMRequestHandlerThread + extends AMHeartbeatRequestHandler { + public TestableAMRequestHandlerThread(Configuration conf, + ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) { + super(conf, applicationId, rmProxyRelayer); + } + + @Override + public void run() { + try { + getUGIWithToken(attemptId) + .doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() { + TestableAMRequestHandlerThread.super.run(); + return null; + } + }); + } catch (Exception e) { + LOG.error("Exception running TestableAMRequestHandlerThread", e); + } + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 4267945..c478871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -167,6 +167,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink; + private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations; + + // For unit test synchronization + private Map<SubClusterId, Future<?>> uamRegisterFutures; + /** Thread pool used for asynchronous operations. */ private ExecutorService threadpool; @@ -227,6 +232,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public FederationInterceptor() { this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>(); this.asyncResponseSink = new ConcurrentHashMap<>(); + this.uamRegistrations = new ConcurrentHashMap<>(); + this.uamRegisterFutures = new ConcurrentHashMap<>(); this.threadpool = Executors.newCachedThreadPool(); this.uamPool = createUnmanagedAMPoolManager(this.threadpool); this.secondaryRelayers = new ConcurrentHashMap<>(); @@ -279,8 +286,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { ApplicationMasterProtocol.class, appOwner), appId, this.homeSubClusterId.toString()); - this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId); - this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer); + this.homeHeartbeartHandler = + createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer); this.homeHeartbeartHandler.setUGI(appOwner); this.homeHeartbeartHandler.setDaemon(true); this.homeHeartbeartHandler.start(); @@ -615,10 +622,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * Send the requests to the all sub-cluster resource managers. All * requests are synchronously triggered but sent asynchronously. Later the - * responses will be collected and merged. In addition, it also returns - * the newly registered UAMs. + * responses will be collected and merged. */ - Registrations newRegistrations = sendRequestsToResourceManagers(requests); + sendRequestsToResourceManagers(requests); // Wait for the first async response to arrive long startTime = this.clock.getTime(); @@ -646,9 +652,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // Merge the containers and NMTokens from the new registrations into // the response - if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) { - mergeRegistrationResponses(response, - newRegistrations.getSuccessfulRegistrations()); + + if (!isNullOrEmpty(this.uamRegistrations)) { + Map<SubClusterId, RegisterApplicationMasterResponse> newRegistrations; + synchronized (this.uamRegistrations) { + newRegistrations = new HashMap<>(this.uamRegistrations); + this.uamRegistrations.clear(); + } + mergeRegistrationResponses(response, newRegistrations); } // update the responseId and return the final response to AM @@ -850,8 +861,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @VisibleForTesting protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( - Configuration conf, ApplicationId appId) { - return new AMHeartbeatRequestHandler(conf, appId); + Configuration conf, ApplicationId appId, + AMRMClientRelayer rmProxyRelayer) { + return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer); } /** @@ -1107,18 +1119,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * * @param requests contains the heart beat requests to send to the resource * manager keyed by the sub-cluster id - * @return the registration responses from the newly added sub-cluster - * resource managers * @throws YarnException * @throws IOException */ - private Registrations sendRequestsToResourceManagers( + private void sendRequestsToResourceManagers( Map<SubClusterId, AllocateRequest> requests) throws YarnException, IOException { - // Create new UAM instances for the sub-cluster that we have not seen - // before - Registrations registrations = registerWithNewSubClusters(requests.keySet()); + // Create new UAM instances for the sub-cluster that we haven't seen before + List<SubClusterId> newSubClusters = + registerAndAllocateWithNewSubClusters(requests); // Now that all the registrations are done, send the allocation request // to the sub-cluster RMs asynchronously and don't wait for the response. @@ -1126,6 +1136,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // response sink, then merged and sent to the application master. for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) { SubClusterId subClusterId = entry.getKey(); + if (newSubClusters.contains(subClusterId)) { + // For new sub-clusters, we have already sent the request right after + // register in the async thread + continue; + } if (subClusterId.equals(this.homeSubClusterId)) { // Request for the home sub-cluster resource manager @@ -1133,131 +1148,100 @@ public class FederationInterceptor extends AbstractRequestInterceptor { new HeartbeatCallBack(this.homeSubClusterId, false)); } else { if (!this.uamPool.hasUAMId(subClusterId.getId())) { - // TODO: This means that the registration for this sub-cluster RM - // failed. For now, we ignore the resource requests and continue - // but we need to fix this and handle this situation. One way would - // be to send the request to another RM by consulting the policy. - LOG.warn("Unmanaged AM registration not found for sub-cluster {}", - subClusterId); - continue; + throw new YarnException("UAM not found for " + this.attemptId + + " in sub-cluster " + subClusterId); } this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), new HeartbeatCallBack(subClusterId, true)); } } - - return registrations; } /** - * This method ensures that Unmanaged AMs are created for each of the - * specified sub-cluster specified in the input and registers with the - * corresponding resource managers. + * This method ensures that Unmanaged AMs are created for newly specified + * sub-clusters, registers with the corresponding resource managers and send + * the first allocate request async. */ - private Registrations registerWithNewSubClusters( - Set<SubClusterId> subClusterSet) throws IOException { - - List<SubClusterId> failedRegistrations = new ArrayList<>(); - Map<SubClusterId, RegisterApplicationMasterResponse> - successfulRegistrations = new HashMap<>(); + private List<SubClusterId> registerAndAllocateWithNewSubClusters( + final Map<SubClusterId, AllocateRequest> requests) throws IOException { // Check to see if there are any new sub-clusters in this request // list and create and register Unmanaged AM instance for the new ones - List<String> newSubClusters = new ArrayList<>(); - for (SubClusterId subClusterId : subClusterSet) { + List<SubClusterId> newSubClusters = new ArrayList<>(); + for (SubClusterId subClusterId : requests.keySet()) { if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(subClusterId.getId())) { - newSubClusters.add(subClusterId.getId()); + newSubClusters.add(subClusterId); } } - if (newSubClusters.size() > 0) { - final RegisterApplicationMasterRequest registerRequest = - this.amRegistrationRequest; - final AMRMProxyApplicationContext appContext = getApplicationContext(); - ExecutorCompletionService<RegisterApplicationMasterResponseInfo> - completionService = new ExecutorCompletionService<>(this.threadpool); + this.uamRegisterFutures.clear(); + for (final SubClusterId scId : newSubClusters) { + Future<?> future = this.threadpool.submit(new Runnable() { + @Override + public void run() { + String subClusterId = scId.getId(); - for (final String subClusterId : newSubClusters) { - completionService - .submit(new Callable<RegisterApplicationMasterResponseInfo>() { - @Override - public RegisterApplicationMasterResponseInfo call() - throws Exception { - - // Create a config loaded with federation on and subclusterId - // for each UAM - YarnConfiguration config = new YarnConfiguration(getConf()); - FederationProxyProviderUtil.updateConfForFederation(config, - subClusterId); - - RegisterApplicationMasterResponse uamResponse = null; - Token<AMRMTokenIdentifier> token = null; - try { - // For appNameSuffix, use subClusterId of the home sub-cluster - token = uamPool.launchUAM(subClusterId, config, - attemptId.getApplicationId(), - amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString(), true, subClusterId); - - secondaryRelayers.put(subClusterId, - uamPool.getAMRMClientRelayer(subClusterId)); - - uamResponse = uamPool.registerApplicationMaster(subClusterId, - registerRequest); - } catch (Throwable e) { - LOG.error("Failed to register application master: " - + subClusterId + " Application: " + attemptId, e); - } - return new RegisterApplicationMasterResponseInfo(uamResponse, - SubClusterId.newInstance(subClusterId), token); - } - }); - } + // Create a config loaded with federation on and subclusterId + // for each UAM + YarnConfiguration config = new YarnConfiguration(getConf()); + FederationProxyProviderUtil.updateConfForFederation(config, + subClusterId); - // Wait for other sub-cluster resource managers to return the - // response and add it to the Map for returning to the caller - for (int i = 0; i < newSubClusters.size(); ++i) { - try { - Future<RegisterApplicationMasterResponseInfo> future = - completionService.take(); - RegisterApplicationMasterResponseInfo uamResponse = future.get(); - if (LOG.isDebugEnabled()) { - LOG.debug("Received register application response from RM: " - + uamResponse.getSubClusterId()); + RegisterApplicationMasterResponse uamResponse = null; + Token<AMRMTokenIdentifier> token = null; + try { + // For appNameSuffix, use subClusterId of the home sub-cluster + token = uamPool.launchUAM(subClusterId, config, + attemptId.getApplicationId(), amRegistrationResponse.getQueue(), + getApplicationContext().getUser(), homeSubClusterId.toString(), + true, subClusterId); + + secondaryRelayers.put(subClusterId, + uamPool.getAMRMClientRelayer(subClusterId)); + + uamResponse = uamPool.registerApplicationMaster(subClusterId, + amRegistrationRequest); + } catch (Throwable e) { + LOG.error("Failed to register application master: " + subClusterId + + " Application: " + attemptId, e); + // TODO: UAM registration for this sub-cluster RM + // failed. For now, we ignore the resource requests and continue + // but we need to fix this and handle this situation. One way would + // be to send the request to another RM by consulting the policy. + return; } + uamRegistrations.put(scId, uamResponse); + LOG.info("Successfully registered unmanaged application master: " + + subClusterId + " ApplicationId: " + attemptId); - if (uamResponse.getResponse() == null) { - failedRegistrations.add(uamResponse.getSubClusterId()); - } else { - LOG.info("Successfully registered unmanaged application master: " - + uamResponse.getSubClusterId() + " ApplicationId: " - + this.attemptId); - successfulRegistrations.put(uamResponse.getSubClusterId(), - uamResponse.getResponse()); + try { + uamPool.allocateAsync(subClusterId, requests.get(scId), + new HeartbeatCallBack(scId, true)); + } catch (Throwable e) { + LOG.error("Failed to allocate async to " + subClusterId + + " Application: " + attemptId, e); + } - // Save the UAM token in registry or NMSS + // Save the UAM token in registry or NMSS + try { if (registryClient != null) { - registryClient.writeAMRMTokenForUAM( - this.attemptId.getApplicationId(), - uamResponse.getSubClusterId().getId(), - uamResponse.getUamToken()); + registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), + subClusterId, token); } else if (getNMStateStore() != null) { - getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId, - NMSS_SECONDARY_SC_PREFIX - + uamResponse.getSubClusterId().getId(), - uamResponse.getUamToken().encodeToUrlString() - .getBytes(STRING_TO_BYTE_FORMAT)); + getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, + NMSS_SECONDARY_SC_PREFIX + subClusterId, + token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT)); } + } catch (Throwable e) { + LOG.error("Failed to persist UAM token from " + subClusterId + + " Application: " + attemptId, e); } - } catch (Exception e) { - LOG.warn("Failed to register unmanaged application master: " - + " ApplicationId: " + this.attemptId, e); } - } + }); + this.uamRegisterFutures.put(scId, future); } - - return new Registrations(successfulRegistrations, failedRegistrations); + return newSubClusters; } /** @@ -1573,7 +1557,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } @VisibleForTesting - public int getUnmanagedAMPoolSize() { + protected int getUnmanagedAMPoolSize() { return this.uamPool.getAllUAMIds().size(); } @@ -1583,6 +1567,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } @VisibleForTesting + protected Map<SubClusterId, Future<?>> getUamRegisterFutures() { + return this.uamRegisterFutures; + } + + @VisibleForTesting public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() { return this.asyncResponseSink; } @@ -1614,7 +1603,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { asyncResponseSink.notifyAll(); } + // Notify policy of allocate response + try { + policyInterpreter.notifyOfResponse(subClusterId, response); + } catch (YarnException e) { + LOG.warn("notifyOfResponse for policy failed for sub-cluster " + + subClusterId, e); + } + // Save the new AMRMToken for the UAM if present + // Do this last because it can be slow... if (this.isUAM && response.getAMRMToken() != null) { Token<AMRMTokenIdentifier> newToken = ConverterUtils .convertFromYarn(response.getAMRMToken(), (Text) null); @@ -1648,44 +1646,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } } - - // Notify policy of allocate response - try { - policyInterpreter.notifyOfResponse(subClusterId, response); - } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for sub-cluster " - + subClusterId, e); - } - } - } - - /** - * Private structure for encapsulating SubClusterId and - * RegisterApplicationMasterResponse instances. - */ - private static class RegisterApplicationMasterResponseInfo { - private RegisterApplicationMasterResponse response; - private SubClusterId subClusterId; - private Token<AMRMTokenIdentifier> uamToken; - - RegisterApplicationMasterResponseInfo( - RegisterApplicationMasterResponse response, SubClusterId subClusterId, - Token<AMRMTokenIdentifier> uamToken) { - this.response = response; - this.subClusterId = subClusterId; - this.uamToken = uamToken; - } - - public RegisterApplicationMasterResponse getResponse() { - return response; - } - - public SubClusterId getSubClusterId() { - return subClusterId; - } - - public Token<AMRMTokenIdentifier> getUamToken() { - return uamToken; } } @@ -1713,33 +1673,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Private structure for encapsulating successful and failed application - * master registration responses. - */ - private static class Registrations { - private Map<SubClusterId, RegisterApplicationMasterResponse> - successfulRegistrations; - private List<SubClusterId> failedRegistrations; - - Registrations( - Map<SubClusterId, RegisterApplicationMasterResponse> - successfulRegistrations, - List<SubClusterId> failedRegistrations) { - this.successfulRegistrations = successfulRegistrations; - this.failedRegistrations = failedRegistrations; - } - - public Map<SubClusterId, RegisterApplicationMasterResponse> - getSuccessfulRegistrations() { - return this.successfulRegistrations; - } - - public List<SubClusterId> getFailedRegistrations() { - return this.failedRegistrations; - } - } - - /** * Utility method to check if the specified Collection is null or empty. * * @param c the collection object http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 407ae83..ec75cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -201,9 +201,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { LOG.info("Number of allocated containers in the original request: " + Integer.toString(allocateResponse.getAllocatedContainers().size())); - // Make sure this request is picked up by all async heartbeat handlers - interceptor.drainAllAsyncQueue(false); - // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; @@ -217,8 +214,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { checkAMRMToken(allocateResponse.getAMRMToken()); lastResponseId = allocateResponse.getResponseId(); - containers.addAll(allocateResponse.getAllocatedContainers()); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + containers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Number of allocated containers in this request: " + Integer.toString(allocateResponse.getAllocatedContainers().size())); LOG.info("Total number of allocated containers: " @@ -258,9 +257,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { LOG.info("Number of containers received in the original request: " + Integer.toString(newlyFinished.size())); - // Make sure this request is picked up by all async heartbeat handlers - interceptor.drainAllAsyncQueue(false); - // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; @@ -273,10 +269,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { checkAMRMToken(allocateResponse.getAMRMToken()); lastResponseId = allocateResponse.getResponseId(); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); - LOG.info("Number of containers received in this request: " + Integer.toString(newlyFinished.size())); LOG.info("Total number of containers received: " @@ -438,7 +436,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc = new ExecutorCompletionService<>(threadpool); - Object syncObj = MockResourceManagerFacade.getSyncObj(); + Object syncObj = MockResourceManagerFacade.getRegisterSyncObj(); // Two register threads synchronized (syncObj) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c3d22d3b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java index 78f6eb0..7c57ab1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; @@ -69,8 +70,9 @@ public class TestableFederationInterceptor extends FederationInterceptor { @Override protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( - Configuration conf, ApplicationId appId) { - return new TestableAMRequestHandlerThread(conf, appId); + Configuration conf, ApplicationId appId, + AMRMClientRelayer rmProxyRelayer) { + return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer); } @SuppressWarnings("unchecked") @@ -205,7 +207,8 @@ public class TestableFederationInterceptor extends FederationInterceptor { String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, String rmId) { return new TestableUnmanagedApplicationManager(conf, appId, queueName, - submitter, appNameSuffix, keepContainersAcrossApplicationAttempts); + submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, + rmId); } } @@ -218,10 +221,17 @@ public class TestableFederationInterceptor extends FederationInterceptor { public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, - String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) { + String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, + String rmName) { super(conf, appId, queueName, submitter, appNameSuffix, - keepContainersAcrossApplicationAttempts, "TEST"); - setHandlerThread(new TestableAMRequestHandlerThread(conf, appId)); + keepContainersAcrossApplicationAttempts, rmName); + } + + @Override + protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler( + Configuration conf, ApplicationId appId, + AMRMClientRelayer rmProxyRelayer) { + return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer); } /** @@ -244,8 +254,8 @@ public class TestableFederationInterceptor extends FederationInterceptor { protected class TestableAMRequestHandlerThread extends AMHeartbeatRequestHandler { public TestableAMRequestHandlerThread(Configuration conf, - ApplicationId applicationId) { - super(conf, applicationId); + ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) { + super(conf, applicationId, rmProxyRelayer); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
