YARN-8696. [AMRMProxy] FederationInterceptor upgrade: home sub-cluster heartbeat async. Contributed by Botong Huang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69379258 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69379258 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69379258 Branch: refs/heads/branch-2 Commit: 69379258380e7755717f5a3efcd43c341f4a8a0d Parents: 6056597 Author: Giovanni Matteo Fumarola <gif...@apache.com> Authored: Mon Sep 24 11:40:07 2018 -0700 Committer: Giovanni Matteo Fumarola <gif...@apache.com> Committed: Mon Sep 24 11:40:07 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../yarn/conf/TestYarnConfigurationFields.java | 2 + .../hadoop/yarn/client/AMRMClientUtils.java | 8 + .../yarn/server/AMHeartbeatRequestHandler.java | 23 +- .../utils/FederationRegistryClient.java | 10 +- .../yarn/server/uam/UnmanagedAMPoolManager.java | 15 + .../server/uam/UnmanagedApplicationManager.java | 14 + .../yarn/server/MockResourceManagerFacade.java | 72 ++-- .../amrmproxy/FederationInterceptor.java | 376 +++++++++++-------- .../amrmproxy/TestFederationInterceptor.java | 298 +++++++++------ .../TestableFederationInterceptor.java | 107 ++++++ .../ApplicationMasterService.java | 15 +- 12 files changed, 635 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0e26805..2883705 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2835,6 +2835,11 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.federation.resolver." + "DefaultSubClusterResolverImpl"; + // the maximum wait time for the first async heartbeat response + public static final String FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = + FEDERATION_PREFIX + "amrmproxy.hb.maximum.wait.ms"; + public static final long DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS = 5000; + // AMRMProxy split-merge timeout for active sub-clusters. We will not route // new asks to expired sub-clusters. public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT = http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 8d6f72f..d414991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -102,6 +102,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS); configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); + configurationPropsToSkipCompare .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT); // Federation StateStore ZK implementation configs to be ignored http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index cddb9be..e95ef2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -44,6 +44,8 @@ public final class AMRMClientUtils { private static final Logger LOG = LoggerFactory.getLogger(AMRMClientUtils.class); + public static final int PRE_REGISTER_RESPONSE_ID = -1; + public static final String APP_ALREADY_REGISTERED_MESSAGE = "Application Master is already registered : "; @@ -143,4 +145,10 @@ public final class AMRMClientUtils { return -1; } } + + public static int getNextResponseId(int responseId) { + // Loop between 0 to Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java index 42227bb..380c216 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java @@ -47,6 +47,9 @@ public class AMHeartbeatRequestHandler extends Thread { // Indication flag for the thread to keep running private volatile boolean keepRunning; + // For unit test draining + private volatile boolean isThreadWaiting; + private Configuration conf; private ApplicationId applicationId; @@ -61,6 +64,7 @@ public class AMHeartbeatRequestHandler extends Thread { this.setUncaughtExceptionHandler( new HeartBeatThreadUncaughtExceptionHandler()); this.keepRunning = true; + this.isThreadWaiting = false; this.conf = conf; this.applicationId = applicationId; @@ -82,12 +86,15 @@ public class AMHeartbeatRequestHandler extends Thread { while (keepRunning) { AsyncAllocateRequestInfo requestInfo; try { - requestInfo = requestQueue.take(); + this.isThreadWaiting = true; + requestInfo = this.requestQueue.take(); + this.isThreadWaiting = false; + if (requestInfo == null) { throw new YarnException( "Null requestInfo taken from request queue"); } - if (!keepRunning) { + if (!this.keepRunning) { break; } @@ -98,7 +105,7 @@ public class AMHeartbeatRequestHandler extends Thread { throw new YarnException("Null allocateRequest from requestInfo"); } if (LOG.isDebugEnabled()) { - LOG.debug("Sending Heartbeat to Unmanaged AM. AskList:" + LOG.debug("Sending Heartbeat to RM. AskList:" + ((request.getAskList() == null) ? " empty" : request.getAskList().size())); } @@ -182,6 +189,16 @@ public class AMHeartbeatRequestHandler extends Thread { } @VisibleForTesting + public void drainHeartbeatThread() { + while (!this.isThreadWaiting || this.requestQueue.size() > 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + + @VisibleForTesting public int getRequestQueueSize() { return this.requestQueue.size(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java index 6624318..13545c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -78,7 +78,7 @@ public class FederationRegistryClient { * * @return the list of known applications */ - public List<String> getAllApplications() { + public synchronized List<String> getAllApplications() { // Suppress the exception here because it is valid that the entry does not // exist List<String> applications = null; @@ -99,7 +99,7 @@ public class FederationRegistryClient { * For testing, delete all application records in registry. */ @VisibleForTesting - public void cleanAllApplications() { + public synchronized void cleanAllApplications() { try { removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null), true, false); @@ -115,7 +115,7 @@ public class FederationRegistryClient { * @param token the UAM of the application * @return whether the amrmToken is added or updated to a new value */ - public boolean writeAMRMTokenForUAM(ApplicationId appId, + public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId, String subClusterId, Token<AMRMTokenIdentifier> token) { Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); @@ -154,7 +154,7 @@ public class FederationRegistryClient { * @param appId application id * @return the sub-cluster to UAM token mapping */ - public Map<String, Token<AMRMTokenIdentifier>> + public synchronized Map<String, Token<AMRMTokenIdentifier>> loadStateFromRegistry(ApplicationId appId) { Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>(); // Suppress the exception here because it is valid that the entry does not @@ -203,7 +203,7 @@ public class FederationRegistryClient { * * @param appId application id */ - public void removeAppFromRegistry(ApplicationId appId) { + public synchronized void removeAppFromRegistry(ApplicationId appId) { Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); LOG.info("Removing all registry entries for {}", appId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index ca6fef0..10a1ddb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -406,4 +406,19 @@ public class UnmanagedAMPoolManager extends AbstractService { return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer(); } + @VisibleForTesting + public int getRequestQueueSize(String uamId) throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + return this.unmanagedAppMasterMap.get(uamId).getRequestQueueSize(); + } + + @VisibleForTesting + public void drainUAMHeartbeats() { + for (UnmanagedApplicationManager uam : this.unmanagedAppMasterMap + .values()) { + uam.drainHeartbeatThread(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 6cc6cf3..608fbab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -225,6 +225,10 @@ public class UnmanagedApplicationManager { LOG.debug("RegisterUAM returned existing NM token for node " + nmToken.getNodeId()); } + LOG.info( + "RegisterUAM returned {} existing running container and {} NM tokens", + response.getContainersFromPreviousAttempts().size(), + response.getNMTokensFromPreviousAttempts().size()); // Only when register succeed that we start the heartbeat thread this.heartbeatHandler.setDaemon(true); @@ -516,4 +520,14 @@ public class UnmanagedApplicationManager { public int getRequestQueueSize() { return this.heartbeatHandler.getRequestQueueSize(); } + + @VisibleForTesting + protected void setHandlerThread(AMHeartbeatRequestHandler thread) { + this.heartbeatHandler = thread; + } + + @VisibleForTesting + protected void drainHeartbeatThread() { + this.heartbeatHandler.drainHeartbeatThread(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 97e3866..47f85cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -174,8 +174,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, private HashSet<ApplicationId> applicationMap = new HashSet<>(); private HashSet<ApplicationId> keepContainerOnUams = new HashSet<>(); - private HashMap<ApplicationAttemptId, List<ContainerId>> - applicationContainerIdMap = new HashMap<>(); + private HashMap<ApplicationId, List<ContainerId>> applicationContainerIdMap = + new HashMap<>(); + private int rmId; private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; private int subClusterId; @@ -188,6 +189,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, private boolean shouldReRegisterNext = false; + private boolean shouldWaitForSyncNextAllocate = false; + // For unit test synchronization private static Object syncObj = new Object(); @@ -203,6 +206,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public MockResourceManagerFacade(Configuration conf, int startContainerIndex, int subClusterId, boolean isRunning) { this.conf = conf; + this.rmId = startContainerIndex; this.containerIndex.set(startContainerIndex); this.subClusterId = subClusterId; this.isRunning = isRunning; @@ -244,17 +248,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Registering application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); List<Container> containersFromPreviousAttempt = null; synchronized (applicationContainerIdMap) { - if (applicationContainerIdMap.containsKey(attemptId)) { - if (keepContainerOnUams.contains(attemptId.getApplicationId())) { + if (applicationContainerIdMap.containsKey(appId)) { + if (keepContainerOnUams.contains(appId)) { // For UAM with the keepContainersFromPreviousAttempt flag, return all // running containers containersFromPreviousAttempt = new ArrayList<>(); - for (ContainerId containerId : applicationContainerIdMap - .get(attemptId)) { + for (ContainerId containerId : applicationContainerIdMap.get(appId)) { containersFromPreviousAttempt.add(Container.newInstance(containerId, null, null, null, null, null)); } @@ -264,7 +268,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } } else { // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(attemptId, new ArrayList<ContainerId>()); + applicationContainerIdMap.put(appId, new ArrayList<ContainerId>()); } } @@ -299,6 +303,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Finishing application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -309,8 +314,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, synchronized (applicationContainerIdMap) { // Remove the containers that were being tracked for this application Assert.assertTrue("The application id is NOT registered: " + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - applicationContainerIdMap.remove(attemptId); + applicationContainerIdMap.containsKey(appId)); + applicationContainerIdMap.remove(appId); } return FinishApplicationMasterResponse.newInstance( @@ -335,6 +340,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Allocate from application attempt: " + attemptId); + ApplicationId appId = attemptId.getApplicationId(); if (shouldReRegisterNext) { String message = "AM is not registered, should re-register."; @@ -342,6 +348,21 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, throw new ApplicationMasterNotRegisteredException(message); } + // Wait for signal for certain test cases + synchronized (syncObj) { + if (shouldWaitForSyncNextAllocate) { + shouldWaitForSyncNextAllocate = false; + + LOG.info("Allocate call in RM start waiting"); + try { + syncObj.wait(); + LOG.info("Allocate call in RM wait finished"); + } catch (InterruptedException e) { + LOG.info("Allocate call in RM wait interrupted", e); + } + } + } + ArrayList<Container> containerList = new ArrayList<Container>(); if (request.getAskList() != null) { for (ResourceRequest rr : request.getAskList()) { @@ -366,9 +387,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, // will need it in future Assert.assertTrue( "The application id is Not registered before allocate(): " - + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List<ContainerId> ids = applicationContainerIdMap.get(attemptId); + + appId, + applicationContainerIdMap.containsKey(appId)); + List<ContainerId> ids = applicationContainerIdMap.get(appId); ids.add(containerId); } } @@ -380,12 +401,10 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, && request.getReleaseList().size() > 0) { LOG.info("Releasing containers: " + request.getReleaseList().size()); synchronized (applicationContainerIdMap) { - Assert - .assertTrue( - "The application id is not registered before allocate(): " - + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List<ContainerId> ids = applicationContainerIdMap.get(attemptId); + Assert.assertTrue( + "The application id is not registered before allocate(): " + appId, + applicationContainerIdMap.containsKey(appId)); + List<ContainerId> ids = applicationContainerIdMap.get(appId); for (ContainerId id : request.getReleaseList()) { boolean found = false; @@ -411,7 +430,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, + " for application attempt: " + conf.get("AMRMTOKEN")); // Always issue a new AMRMToken as if RM rolled master key - Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); + Token newAMRMToken = Token.newInstance(new byte[0], + Integer.toString(this.rmId), new byte[0], ""); return AllocateResponse.newInstance(0, completedList, containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, @@ -419,6 +439,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, new ArrayList<UpdatedContainer>(), null); } + public void setWaitForSyncNextAllocate(boolean wait) { + synchronized (syncObj) { + shouldWaitForSyncNextAllocate = wait; + } + } + @Override public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException { @@ -610,14 +636,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - ApplicationAttemptId attemptId = request.getApplicationAttemptId(); + ApplicationId appId = request.getApplicationAttemptId().getApplicationId(); List<ContainerReport> containers = new ArrayList<>(); synchronized (applicationContainerIdMap) { // Return the list of running containers that were being tracked for this // application - Assert.assertTrue("The application id is NOT registered: " + attemptId, - applicationContainerIdMap.containsKey(attemptId)); - List<ContainerId> ids = applicationContainerIdMap.get(attemptId); + Assert.assertTrue("The application id is NOT registered: " + appId, + applicationContainerIdMap.containsKey(appId)); + List<ContainerId> ids = applicationContainerIdMap.get(appId); for (ContainerId c : ids) { containers.add(ContainerReport.newInstance(c, null, null, null, 0, 0, null, null, 0, null, null)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 1bf882f..c02296d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -62,14 +64,18 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler; import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; @@ -80,9 +86,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager; -import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,6 +122,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor { NMSS_CLASS_PREFIX + "secondarySC/"; public static final String STRING_TO_BYTE_FORMAT = "UTF-8"; + private static final RecordFactory RECORD_FACTORY = + RecordFactoryProvider.getRecordFactory(null); + + /** + * From AM's perspective, FederationInterceptor behaves exactly the same as + * YarnRM (ApplicationMasterService). This is to remember the last heart beat + * response, used to handle duplicate heart beat and responseId from AM. + */ + private AllocateResponse lastAllocateResponse; + private final Object lastAllocateResponseLock = new Object(); + private ApplicationAttemptId attemptId; /** @@ -124,7 +141,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private AMRMClientRelayer homeRMRelayer; private SubClusterId homeSubClusterId; - private volatile int lastHomeResponseId; + private AMHeartbeatRequestHandler homeHeartbeartHandler; /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), @@ -146,7 +163,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * Stores the AllocateResponses that are received asynchronously from all the - * sub-cluster resource managers except the home RM. + * sub-cluster resource managers, including home RM. */ private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink; @@ -194,14 +211,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** The policy used to split requests among sub-clusters. */ private FederationAMRMProxyPolicy policyInterpreter; - /** - * The proxy ugi used to talk to home RM, loaded with the up-to-date AMRMToken - * issued by home RM. - */ - private UserGroupInformation appOwner; - private FederationRegistryClient registryClient; + // the maximum wait time for the first async heart beat response + private long heartbeatMaxWaitTimeMs; + + private MonotonicClock clock = new MonotonicClock(); + /** * Creates an instance of the FederationInterceptor class. */ @@ -213,7 +229,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.secondaryRelayers = new ConcurrentHashMap<>(); this.amRegistrationRequest = null; this.amRegistrationResponse = null; - this.lastHomeResponseId = Integer.MAX_VALUE; this.justRecovered = false; } @@ -233,8 +248,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { setConf(conf); } + // The proxy ugi used to talk to home RM as well as Yarn Registry, loaded + // with the up-to-date AMRMToken issued by home RM. + UserGroupInformation appOwner; try { - this.appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), + appOwner = UserGroupInformation.createProxyUser(appContext.getUser(), UserGroupInformation.getCurrentUser()); } catch (Exception ex) { throw new YarnRuntimeException(ex); @@ -242,10 +260,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (appContext.getRegistryClient() != null) { this.registryClient = new FederationRegistryClient(conf, - appContext.getRegistryClient(), this.appOwner); + appContext.getRegistryClient(), appOwner); // Add all app tokens for Yarn Registry access if (appContext.getCredentials() != null) { - this.appOwner.addCredentials(appContext.getCredentials()); + appOwner.addCredentials(appContext.getCredentials()); } } @@ -254,9 +272,21 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, - ApplicationMasterProtocol.class, this.appOwner), appId, + ApplicationMasterProtocol.class, appOwner), appId, this.homeSubClusterId.toString()); + this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId); + this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer); + this.homeHeartbeartHandler.setUGI(appOwner); + this.homeHeartbeartHandler.setDaemon(true); + this.homeHeartbeartHandler.start(); + + // set lastResponseId to -1 before application master registers + this.lastAllocateResponse = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); + this.lastAllocateResponse + .setResponseId(AMRMClientUtils.PRE_REGISTER_RESPONSE_ID); + this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -265,6 +295,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.uamPool.init(conf); this.uamPool.start(); + + this.heartbeatMaxWaitTimeMs = + conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS, + YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS); } @Override @@ -272,6 +306,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { super.recover(recoveredDataMap); LOG.info("Recovering data for FederationInterceptor for {}", this.attemptId); + this.justRecovered = true; + if (recoveredDataMap == null) { return; } @@ -294,9 +330,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); LOG.info("amRegistrationResponse recovered for {}", this.attemptId); - // Trigger re-register and full pending re-send only if we have a - // saved register response. This should always be true though. - this.justRecovered = true; } // Recover UAM amrmTokens from registry or NMSS @@ -355,6 +388,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { .getContainersFromPreviousAttempts()) { containerIdToSubClusterIdMap.put(container.getId(), subClusterId); containers++; + LOG.debug(" From subcluster " + subClusterId + + " running container " + container.getId()); } LOG.info("Recovered {} running containers from UAM in {}", response.getContainersFromPreviousAttempts().size(), @@ -384,7 +419,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { LOG.debug(" From home RM " + this.homeSubClusterId + " running container " + container.getContainerId()); } - LOG.info("{} running containers including AM recovered from home RM ", + LOG.info("{} running containers including AM recovered from home RM {}", response.getContainerList().size(), this.homeSubClusterId); LOG.info( @@ -411,8 +446,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * so that when AM registers more than once, it returns the same register * success response instead of throwing * {@link InvalidApplicationMasterRequestException}. Furthermore, we present - * to AM as if we are the RM that never fails over. When actual RM fails over, - * we always re-register automatically. + * to AM as if we are the RM that never fails over (except when AMRMProxy + * restarts). When actual RM fails over, we always re-register automatically. * * We did this because FederationInterceptor can receive concurrent register * requests from AM because of timeout between AM and AMRMProxy, which is @@ -425,6 +460,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public synchronized RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException { + + // Reset the heartbeat responseId to zero upon register + synchronized (this.lastAllocateResponseLock) { + this.lastAllocateResponse.setResponseId(0); + } + this.justRecovered = false; + // If AM is calling with a different request, complain if (this.amRegistrationRequest != null) { if (!this.amRegistrationRequest.equals(request)) { @@ -524,34 +566,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ @Override public AllocateResponse allocate(AllocateRequest request) - throws YarnException { + throws YarnException, IOException { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); - if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) { - // Save the responseId home RM is expecting - this.lastHomeResponseId = request.getResponseId(); - + if (this.justRecovered) { throw new ApplicationMasterNotRegisteredException( "AMRMProxy just restarted and recovered for " + this.attemptId + ". AM should re-register and full re-send pending requests."); } - // Override responseId in the request in two cases: - // - // 1. After we just recovered after an NM restart and AM's responseId is - // reset due to the exception we generate. We need to override the - // responseId to the one homeRM expects. - // - // 2. After homeRM fail-over, the allocate response with reseted responseId - // might not be returned successfully back to AM because of RPC connection - // timeout between AM and AMRMProxy. In this case, we remember and reset the - // responseId for AM. - if (this.justRecovered - || request.getResponseId() > this.lastHomeResponseId) { - LOG.warn("Setting allocate responseId for {} from {} to {}", - this.attemptId, request.getResponseId(), this.lastHomeResponseId); - request.setResponseId(this.lastHomeResponseId); + // Check responseId and handle duplicate heartbeat exactly same as RM + synchronized (this.lastAllocateResponseLock) { + LOG.info("Heartbeat from " + this.attemptId + " with responseId " + + request.getResponseId() + " when we are expecting " + + this.lastAllocateResponse.getResponseId()); + // Normally request.getResponseId() == lastResponse.getResponseId() + if (AMRMClientUtils.getNextResponseId( + request.getResponseId()) == this.lastAllocateResponse + .getResponseId()) { + // heartbeat one step old, simply return lastReponse + return this.lastAllocateResponse; + } else if (request.getResponseId() != this.lastAllocateResponse + .getResponseId()) { + throw new InvalidApplicationMasterRequestException( + AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(attemptId, + this.lastAllocateResponse.getResponseId(), + request.getResponseId())); + } } try { @@ -560,71 +602,55 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Map<SubClusterId, AllocateRequest> requests = splitAllocateRequest(request); - // Send the requests to the secondary sub-cluster resource managers. - // These secondary requests are send asynchronously and the responses will - // be collected and merged with the home response. In addition, it also - // return the newly registered Unmanaged AMs. - Registrations newRegistrations = - sendRequestsToSecondaryResourceManagers(requests); - - // Send the request to the home RM and get the response - AllocateRequest homeRequest = requests.get(this.homeSubClusterId); - LOG.info("{} heartbeating to home RM with responseId {}", this.attemptId, - homeRequest.getResponseId()); - - AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); - - // Reset the flag after the first successful homeRM allocate response, - // otherwise keep overriding the responseId of new allocate request - if (this.justRecovered) { - this.justRecovered = false; + /** + * Send the requests to the all sub-cluster resource managers. All + * requests are synchronously triggered but sent asynchronously. Later the + * responses will be collected and merged. In addition, it also returns + * the newly registered UAMs. + */ + Registrations newRegistrations = sendRequestsToResourceManagers(requests); + + // Wait for the first async response to arrive + long startTime = this.clock.getTime(); + synchronized (this.asyncResponseSink) { + try { + this.asyncResponseSink.wait(this.heartbeatMaxWaitTimeMs); + } catch (InterruptedException e) { + } } + long firstResponseTime = this.clock.getTime() - startTime; - // Notify policy of home response + // An extra brief wait for other async heart beats, so that most of their + // responses can make it back to AM in the same heart beat round trip. try { - this.policyInterpreter.notifyOfResponse(this.homeSubClusterId, - homeResponse); - } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for home sub-cluster " - + this.homeSubClusterId, e); + Thread.sleep(firstResponseTime); + } catch (InterruptedException e) { } - // If the resource manager sent us a new token, add to the current user - if (homeResponse.getAMRMToken() != null) { - LOG.debug("Received new AMRMToken"); - YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(), - this.appOwner, getConf()); - } + // Prepare the response to AM + AllocateResponse response = + RECORD_FACTORY.newRecordInstance(AllocateResponse.class); - // Merge the responses from home and secondary sub-cluster RMs - homeResponse = mergeAllocateResponses(homeResponse); + // Merge all responses from response sink + mergeAllocateResponses(response); // Merge the containers and NMTokens from the new registrations into - // the homeResponse. + // the response if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) { - homeResponse = mergeRegistrationResponses(homeResponse, + mergeRegistrationResponses(response, newRegistrations.getSuccessfulRegistrations()); } - LOG.info("{} heartbeat response from home RM with responseId {}", - this.attemptId, homeResponse.getResponseId()); - - // Update lastHomeResponseId in three cases: - // 1. The normal responseId increments - // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails - // over, AMRMClientRelayer auto re-register and full re-send for homeRM. - // 3. lastHomeResponseId == MAX_INT. This is the initial case or - // responseId about to overflow and wrap around - if (homeResponse.getResponseId() == this.lastHomeResponseId + 1 - || homeResponse.getResponseId() == 1 - || this.lastHomeResponseId == Integer.MAX_VALUE) { - this.lastHomeResponseId = homeResponse.getResponseId(); + // update the responseId and return the final response to AM + synchronized (this.lastAllocateResponseLock) { + response.setResponseId(AMRMClientUtils + .getNextResponseId(this.lastAllocateResponse.getResponseId())); + this.lastAllocateResponse = response; } - - // return the final response to the application master. - return homeResponse; - } catch (IOException ex) { - LOG.error("Exception encountered while processing heart beat", ex); + return response; + } catch (Throwable ex) { + LOG.error("Exception encountered while processing heart beat for " + + this.attemptId, ex); throw new YarnException(ex); } } @@ -696,6 +722,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { FinishApplicationMasterResponse homeResponse = this.homeRMRelayer.finishApplicationMaster(request); + // Stop the home heartbeat thread + this.homeHeartbeartHandler.shutdown(); + if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the // response and merge it with the home response @@ -758,10 +787,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } this.threadpool = null; } - homeRMRelayer.shutdown(); - for(AMRMClientRelayer relayer : secondaryRelayers.values()){ + + // Stop the home heartbeat thread + this.homeHeartbeartHandler.shutdown(); + this.homeRMRelayer.shutdown(); + for (AMRMClientRelayer relayer : this.secondaryRelayers.values()) { relayer.shutdown(); } + super.shutdown(); } @@ -781,8 +814,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } @VisibleForTesting - protected int getLastHomeResponseId() { - return this.lastHomeResponseId; + protected ApplicationAttemptId getAttemptId() { + return this.attemptId; + } + + @VisibleForTesting + protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() { + return this.homeHeartbeartHandler; } /** @@ -798,6 +836,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return new UnmanagedAMPoolManager(threadPool); } + @VisibleForTesting + protected AMHeartbeatRequestHandler createHomeHeartbeartHandler( + Configuration conf, ApplicationId appId) { + return new AMHeartbeatRequestHandler(conf, appId); + } + /** * Create a proxy instance that is used to connect to the Home resource * manager. @@ -872,7 +916,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { + "Reattaching in parallel", uamMap.size(), appId); ExecutorCompletionService<RegisterApplicationMasterResponse> - completionService = new ExecutorCompletionService<>(threadpool); + completionService = new ExecutorCompletionService<>(this.threadpool); for (Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) { final SubClusterId subClusterId = @@ -1047,16 +1091,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { /** * This methods sends the specified AllocateRequests to the appropriate - * sub-cluster resource managers. + * sub-cluster resource managers asynchronously. * * @param requests contains the heart beat requests to send to the resource - * manager keyed by the resource manager address + * manager keyed by the sub-cluster id * @return the registration responses from the newly added sub-cluster * resource managers * @throws YarnException * @throws IOException */ - private Registrations sendRequestsToSecondaryResourceManagers( + private Registrations sendRequestsToResourceManagers( Map<SubClusterId, AllocateRequest> requests) throws YarnException, IOException { @@ -1065,32 +1109,29 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Registrations registrations = registerWithNewSubClusters(requests.keySet()); // Now that all the registrations are done, send the allocation request - // to the sub-cluster RMs using the Unmanaged application masters - // asynchronously and don't wait for the response. The responses will - // arrive asynchronously and will be added to the response sink. These - // responses will be sent to the application master in some future heart - // beat response. + // to the sub-cluster RMs asynchronously and don't wait for the response. + // The responses will arrive asynchronously and will be added to the + // response sink, then merged and sent to the application master. for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) { - final SubClusterId subClusterId = entry.getKey(); + SubClusterId subClusterId = entry.getKey(); if (subClusterId.equals(this.homeSubClusterId)) { - // Skip the request for the home sub-cluster resource manager. - // It will be handled separately in the allocate() method - continue; - } - - if (!this.uamPool.hasUAMId(subClusterId.getId())) { - // TODO: This means that the registration for this sub-cluster RM - // failed. For now, we ignore the resource requests and continue - // but we need to fix this and handle this situation. One way would - // be to send the request to another RM by consulting the policy. - LOG.warn("Unmanaged AM registration not found for sub-cluster {}", - subClusterId); - continue; + // Request for the home sub-cluster resource manager + this.homeHeartbeartHandler.allocateAsync(entry.getValue(), + new HeartbeatCallBack(this.homeSubClusterId, false)); + } else { + if (!this.uamPool.hasUAMId(subClusterId.getId())) { + // TODO: This means that the registration for this sub-cluster RM + // failed. For now, we ignore the resource requests and continue + // but we need to fix this and handle this situation. One way would + // be to send the request to another RM by consulting the policy. + LOG.warn("Unmanaged AM registration not found for sub-cluster {}", + subClusterId); + continue; + } + this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), + new HeartbeatCallBack(subClusterId, true)); } - - this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), - new HeartbeatCallBack(subClusterId)); } return registrations; @@ -1123,7 +1164,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationRequest; final AMRMProxyApplicationContext appContext = getApplicationContext(); ExecutorCompletionService<RegisterApplicationMasterResponseInfo> - completionService = new ExecutorCompletionService<>(threadpool); + completionService = new ExecutorCompletionService<>(this.threadpool); for (final String subClusterId : newSubClusters) { completionService @@ -1208,21 +1249,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Merges the responses from other sub-clusters that we received - * asynchronously with the specified home cluster response and keeps track of - * the containers received from each sub-cluster resource managers. + * Merge the responses from all sub-clusters that we received asynchronously + * and keeps track of the containers received from each sub-cluster resource + * managers. */ - private AllocateResponse mergeAllocateResponses( - AllocateResponse homeResponse) { - // Timing issue, we need to remove the completed and then save the new ones. - removeFinishedContainersFromCache( - homeResponse.getCompletedContainersStatuses()); - cacheAllocatedContainers(homeResponse.getAllocatedContainers(), - this.homeSubClusterId); - + private void mergeAllocateResponses(AllocateResponse mergedResponse) { synchronized (this.asyncResponseSink) { - for (Entry<SubClusterId, List<AllocateResponse>> entry : asyncResponseSink - .entrySet()) { + for (Entry<SubClusterId, List<AllocateResponse>> entry : + this.asyncResponseSink.entrySet()) { SubClusterId subClusterId = entry.getKey(); List<AllocateResponse> responses = entry.getValue(); if (responses.size() > 0) { @@ -1231,14 +1265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { response.getCompletedContainersStatuses()); cacheAllocatedContainers(response.getAllocatedContainers(), subClusterId); - mergeAllocateResponse(homeResponse, response, subClusterId); + mergeAllocateResponse(mergedResponse, response, subClusterId); } responses.clear(); } } } - - return homeResponse; } /** @@ -1256,11 +1288,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } /** - * Helper method for merging the responses from the secondary sub cluster RMs - * with the home response to return to the AM. + * Helper method for merging the registration responses from the secondary sub + * cluster RMs into the allocate response to return to the AM. */ - private AllocateResponse mergeRegistrationResponses( - AllocateResponse homeResponse, + private void mergeRegistrationResponses(AllocateResponse homeResponse, Map<SubClusterId, RegisterApplicationMasterResponse> registrations) { for (Entry<SubClusterId, RegisterApplicationMasterResponse> entry : @@ -1292,13 +1323,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } } - - return homeResponse; } private void mergeAllocateResponse(AllocateResponse homeResponse, AllocateResponse otherResponse, SubClusterId otherRMAddress) { + if (otherResponse.getAMRMToken() != null) { + // Propagate only the new amrmToken from home sub-cluster back to + // AMRMProxyService + if (otherRMAddress.equals(this.homeSubClusterId)) { + homeResponse.setAMRMToken(otherResponse.getAMRMToken()); + } else { + throw new YarnRuntimeException( + "amrmToken from UAM " + otherRMAddress + " should be null here"); + } + } + if (!isNullOrEmpty(otherResponse.getAllocatedContainers())) { if (!isNullOrEmpty(homeResponse.getAllocatedContainers())) { homeResponse.getAllocatedContainers() @@ -1406,9 +1446,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { SubClusterId subClusterId) { for (Container container : containers) { LOG.debug("Adding container {}", container); - if (containerIdToSubClusterIdMap.containsKey(container.getId())) { + + if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) { SubClusterId existingSubClusterId = - containerIdToSubClusterIdMap.get(container.getId()); + this.containerIdToSubClusterIdMap.get(container.getId()); if (existingSubClusterId.equals(subClusterId)) { /* * When RM fails over, the new RM master might send out the same @@ -1441,7 +1482,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - containerIdToSubClusterIdMap.put(container.getId(), subClusterId); + this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId); } } @@ -1463,7 +1504,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { newRequest.setProgress(originalAMRequest.getProgress()); requestMap.put(subClusterId, newRequest); } - return newRequest; } @@ -1472,7 +1512,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private static AllocateRequest createAllocateRequest() { AllocateRequest request = - AllocateRequest.newInstance(0, 0, null, null, null); + RECORD_FACTORY.newRecordInstance(AllocateRequest.class); request.setAskList(new ArrayList<ResourceRequest>()); request.setReleaseList(new ArrayList<ContainerId>()); ResourceBlacklistRequest blackList = @@ -1526,6 +1566,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } @VisibleForTesting + protected UnmanagedAMPoolManager getUnmanagedAMPool() { + return this.uamPool; + } + + @VisibleForTesting public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() { return this.asyncResponseSink; } @@ -1535,9 +1580,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> { private SubClusterId subClusterId; + private boolean isUAM; - HeartbeatCallBack(SubClusterId subClusterId) { + HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) { this.subClusterId = subClusterId; + this.isUAM = isUAM; } @Override @@ -1551,16 +1598,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor { asyncResponseSink.put(subClusterId, responses); } responses.add(response); + // Notify main thread about the response arrival + asyncResponseSink.notifyAll(); } // Save the new AMRMToken for the UAM if present - if (response.getAMRMToken() != null) { + if (this.isUAM && response.getAMRMToken() != null) { Token<AMRMTokenIdentifier> newToken = ConverterUtils .convertFromYarn(response.getAMRMToken(), (Text) null); + // Do not further propagate the new amrmToken for UAM + response.setAMRMToken(null); + // Update the token in registry or NMSS if (registryClient != null) { - registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), - subClusterId.getId(), newToken); + if (registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(), + subClusterId.getId(), newToken)) { + try { + AMRMTokenIdentifier identifier = new AMRMTokenIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(newToken.getIdentifier()))); + LOG.info( + "Received new UAM amrmToken with keyId {} and " + + "service {} from {} for {}, written to Registry", + identifier.getKeyId(), newToken.getService(), subClusterId, + attemptId); + } catch (IOException e) { + } + } } else if (getNMStateStore() != null) { try { getNMStateStore().storeAMRMProxyAppContextEntry(attemptId, @@ -1573,11 +1637,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - // Notify policy of secondary sub-cluster responses + // Notify policy of allocate response try { policyInterpreter.notifyOfResponse(subClusterId, response); } catch (YarnException e) { - LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + LOG.warn("notifyOfResponse for policy failed for sub-cluster " + subClusterId, e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/69379258/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 9950b92..407ae83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -33,6 +33,7 @@ import java.util.concurrent.Executors; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -95,6 +97,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private int testAppId; private ApplicationAttemptId attemptId; + private volatile int lastResponseId; + @Override public void setUp() throws IOException { super.setUp(); @@ -120,6 +124,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null, null, registry)); interceptor.cleanupRegistry(); + + lastResponseId = 0; } @Override @@ -174,8 +180,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { private List<Container> getContainersAndAssert(int numberOfResourceRequests, int numberOfAllocationExcepted) throws Exception { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(1); - List<Container> containers = new ArrayList<Container>(numberOfResourceRequests); List<ResourceRequest> askList = @@ -187,22 +191,31 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateRequest.setAskList(askList); + allocateRequest.setResponseId(lastResponseId); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull("allocate() returned null response", allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); containers.addAll(allocateResponse.getAllocatedContainers()); LOG.info("Number of allocated containers in the original request: " + Integer.toString(allocateResponse.getAllocatedContainers().size())); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; while (containers.size() < numberOfAllocationExcepted && numHeartbeat++ < 10) { - allocateResponse = - interceptor.allocate(Records.newRecord(AllocateRequest.class)); + allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull("allocate() returned null response", allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); containers.addAll(allocateResponse.getAllocatedContainers()); @@ -220,8 +233,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { throws Exception { Assert.assertTrue(containers.size() > 0); AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(1); - List<ContainerId> relList = new ArrayList<ContainerId>(containers.size()); for (Container container : containers) { relList.add(container.getId()); @@ -229,8 +240,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { allocateRequest.setReleaseList(relList); + allocateRequest.setResponseId(lastResponseId); AllocateResponse allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); // The release request will be split and handled by the corresponding UAM. // The release containers returned by the mock resource managers will be @@ -244,14 +258,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { LOG.info("Number of containers received in the original request: " + Integer.toString(newlyFinished.size())); + // Make sure this request is picked up by all async heartbeat handlers + interceptor.drainAllAsyncQueue(false); + // Send max 10 heart beats to receive all the containers. If not, we will // fail the test int numHeartbeat = 0; while (containersForReleasedContainerIds.size() < relList.size() && numHeartbeat++ < 10) { - allocateResponse = - interceptor.allocate(Records.newRecord(AllocateRequest.class)); + allocateRequest = Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + allocateResponse = interceptor.allocate(allocateRequest); Assert.assertNotNull(allocateResponse); + checkAMRMToken(allocateResponse.getAMRMToken()); + lastResponseId = allocateResponse.getResponseId(); + newlyFinished = getCompletedContainerIds( allocateResponse.getCompletedContainersStatuses()); containersForReleasedContainerIds.addAll(newlyFinished); @@ -267,65 +288,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { containersForReleasedContainerIds.size()); } + private void checkAMRMToken(Token amrmToken) { + if (amrmToken != null) { + // The token should be the one issued by home MockRM + Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0))); + } + } + @Test public void testMultipleSubClusters() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); + + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + // Allocate the second batch of containers, with sc1 and sc3 active + deRegisterSubCluster(SubClusterId.newInstance("SC-2")); + registerSubCluster(SubClusterId.newInstance("SC-3")); - // Allocate the first batch of containers, with sc1 and sc2 active - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance("SC-2")); + numberOfContainers = 1; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - int numberOfContainers = 3; - List<Container> containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); - - // Allocate the second batch of containers, with sc1 and sc3 active - deRegisterSubCluster(SubClusterId.newInstance("SC-2")); - registerSubCluster(SubClusterId.newInstance("SC-3")); - - numberOfContainers = 1; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 2)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - - // Allocate the third batch of containers with only in home sub-cluster - // active - deRegisterSubCluster(SubClusterId.newInstance("SC-1")); - deRegisterSubCluster(SubClusterId.newInstance("SC-3")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - - numberOfContainers = 2; - containers.addAll( - getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); - Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); - - // Release all containers - releaseContainersAndAssert(containers); - - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - - FinishApplicationMasterResponse finshResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finshResponse); - Assert.assertEquals(true, finshResponse.getIsUnregistered()); + // Allocate the third batch of containers with only in home sub-cluster + // active + deRegisterSubCluster(SubClusterId.newInstance("SC-1")); + deRegisterSubCluster(SubClusterId.newInstance("SC-3")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + + numberOfContainers = 2; + containers.addAll( + getContainersAndAssert(numberOfContainers, numberOfContainers * 1)); + Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize()); + + // Release all containers + releaseContainersAndAssert(containers); + + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + return null; + } + }); } /* @@ -333,49 +370,58 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { */ @Test public void testReregister() throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { - // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(0); - registerReq.setTrackingUrl(""); + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); - RegisterApplicationMasterResponse registerResponse = - interceptor.registerApplicationMaster(registerReq); - Assert.assertNotNull(registerResponse); + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; - Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); - // Allocate the first batch of containers - registerSubCluster(SubClusterId.newInstance("SC-1")); - registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); + // Allocate the first batch of containers + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance(HOME_SC_ID)); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - int numberOfContainers = 3; - List<Container> containers = - getContainersAndAssert(numberOfContainers, numberOfContainers * 2); - Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + int numberOfContainers = 3; + List<Container> containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - // Release all containers - releaseContainersAndAssert(containers); + // Release all containers + releaseContainersAndAssert(containers); - interceptor.setShouldReRegisterNext(); + interceptor.setShouldReRegisterNext(); - // Finish the application - FinishApplicationMasterRequest finishReq = - Records.newRecord(FinishApplicationMasterRequest.class); - finishReq.setDiagnostics(""); - finishReq.setTrackingUrl(""); - finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); + // Finish the application + FinishApplicationMasterRequest finishReq = + Records.newRecord(FinishApplicationMasterRequest.class); + finishReq.setDiagnostics(""); + finishReq.setTrackingUrl(""); + finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - FinishApplicationMasterResponse finshResponse = - interceptor.finishApplicationMaster(finishReq); - Assert.assertNotNull(finshResponse); - Assert.assertEquals(true, finshResponse.getIsUnregistered()); + FinishApplicationMasterResponse finshResponse = + interceptor.finishApplicationMaster(finishReq); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + return null; + } + }); } /* @@ -442,6 +488,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { // Use port number 1001 to let mock RM block in the register call response = interceptor.registerApplicationMaster( RegisterApplicationMasterRequest.newInstance(null, 1001, null)); + lastResponseId = 0; } catch (Exception e) { LOG.info("Register thread exception", e); response = null; @@ -460,9 +507,11 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { testRecover(null); } - public void testRecover(final RegistryOperations registryObj) throws Exception { - ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); - userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() { + protected void testRecover(final RegistryOperations registryObj) + throws Exception { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { interceptor = new TestableFederationInterceptor(); @@ -480,6 +529,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); @@ -492,6 +542,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { getContainersAndAssert(numberOfContainers, numberOfContainers * 2); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + // Prepare for Federation Interceptor restart and recover Map<String, byte[]> recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId); @@ -517,22 +570,21 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); - Assert.assertEquals(Integer.MAX_VALUE, - interceptor.getLastHomeResponseId()); // The first allocate call expects a fail-over exception and re-register - int responseId = 10; - AllocateRequest allocateRequest = - Records.newRecord(AllocateRequest.class); - allocateRequest.setResponseId(responseId); try { - interceptor.allocate(allocateRequest); + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(lastResponseId); + AllocateResponse allocateResponse = + interceptor.allocate(allocateRequest); + lastResponseId = allocateResponse.getResponseId(); Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + " after FederationInterceptor restarts and recovers"); } catch (ApplicationMasterNotRegisteredException e) { } interceptor.registerApplicationMaster(registerReq); - Assert.assertEquals(responseId, interceptor.getLastHomeResponseId()); + lastResponseId = 0; // Release all containers releaseContainersAndAssert(containers); @@ -614,6 +666,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; } } @@ -629,6 +682,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; // Register the application second time with a different request obj registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); @@ -637,6 +691,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { registerReq.setTrackingUrl("different"); try { registerResponse = interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; Assert.fail("Should throw if a different request obj is used"); } catch (YarnException e) { } @@ -689,20 +744,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { @Test public void testSecondAttempt() throws Exception { - ApplicationUserInfo userInfo = getApplicationUserInfo(testAppId); - userInfo.getUser().doAs(new PrivilegedExceptionAction<Object>() { + final RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(testAppId); + registerReq.setTrackingUrl(""); + + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { // Register the application - RegisterApplicationMasterRequest registerReq = - Records.newRecord(RegisterApplicationMasterRequest.class); - registerReq.setHost(Integer.toString(testAppId)); - registerReq.setRpcPort(testAppId); - registerReq.setTrackingUrl(""); - RegisterApplicationMasterResponse registerResponse = interceptor.registerApplicationMaster(registerReq); Assert.assertNotNull(registerResponse); + lastResponseId = 0; Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); @@ -714,10 +771,13 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { List<Container> containers = getContainersAndAssert(numberOfContainers, numberOfContainers * 2); for (Container c : containers) { - System.out.println(c.getId() + " ha"); + LOG.info("Allocated container " + c.getId()); } Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + // Make sure all async hb threads are done + interceptor.drainAllAsyncQueue(true); + // Preserve the mock RM instances for secondaries ConcurrentHashMap<String, MockResourceManagerFacade> secondaries = interceptor.getSecondaryRMs(); @@ -729,8 +789,20 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor = new TestableFederationInterceptor(null, secondaries); interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, getConf(), attemptId, "test-user", null, null, null, registry)); - registerResponse = interceptor.registerApplicationMaster(registerReq); + return null; + } + }); + // Update the ugi with new attemptId + ugi = interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + lastResponseId = 0; + + int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(numberOfContainers, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org