Repository: hadoop Updated Branches: refs/heads/YARN-7402 3671dc3ef -> 1ca57be32
YARN-8862. [GPG] Add Yarn Registry cleanup in ApplicationCleaner. Contributed by Botong Huang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ca57be3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ca57be3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ca57be3 Branch: refs/heads/YARN-7402 Commit: 1ca57be3208bc351d428ed2ab13ca34b249f2101 Parents: 3671dc3 Author: Botong Huang <[email protected]> Authored: Thu Oct 18 10:26:16 2018 -0700 Committer: Botong Huang <[email protected]> Committed: Thu Oct 18 10:26:16 2018 -0700 ---------------------------------------------------------------------- .../utils/FederationRegistryClient.java | 18 +++++++---- .../utils/TestFederationRegistryClient.java | 31 ++++++++++++++++++- .../globalpolicygenerator/GPGContext.java | 5 +++ .../globalpolicygenerator/GPGContextImpl.java | 12 ++++++++ .../GlobalPolicyGenerator.java | 21 +++++++++++++ .../applicationcleaner/ApplicationCleaner.java | 19 +++++++++++- .../DefaultApplicationCleaner.java | 2 ++ .../TestDefaultApplicationCleaner.java | 32 ++++++++++++++++++++ .../amrmproxy/FederationInterceptor.java | 6 ++-- 9 files changed, 136 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java index 6624318..7eb9049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationRegistryClient.java @@ -202,21 +202,27 @@ public class FederationRegistryClient { * Remove an application from registry. * * @param appId application id + * @param ignoreMemoryState whether to ignore the memory data in terms of + * known application */ - public void removeAppFromRegistry(ApplicationId appId) { + public void removeAppFromRegistry(ApplicationId appId, + boolean ignoreMemoryState) { Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap = this.appSubClusterTokenMap.get(appId); - LOG.info("Removing all registry entries for {}", appId); - - if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { - return; + if (!ignoreMemoryState) { + if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) { + return; + } } + LOG.info("Removing all registry entries for {}", appId); // Lastly remove the application directory String key = getRegistryKey(appId, null); try { removeKeyRegistry(this.registry, this.user, key, true, true); - subClusterTokenMap.clear(); + if (subClusterTokenMap != null) { + subClusterTokenMap.clear(); + } } catch (YarnException e) { LOG.error("Failed removing registry directory key " + key, e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java index 42be851..5b799a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationRegistryClient.java @@ -80,11 +80,40 @@ public class TestFederationRegistryClient { Assert.assertEquals(2, this.registryClient.loadStateFromRegistry(appId).size()); - this.registryClient.removeAppFromRegistry(appId); + this.registryClient.removeAppFromRegistry(appId, false); Assert.assertEquals(0, this.registryClient.getAllApplications().size()); Assert.assertEquals(0, this.registryClient.loadStateFromRegistry(appId).size()); } + @Test + public void testRemoveWithMemoryState() { + ApplicationId appId1 = ApplicationId.newInstance(0, 0); + ApplicationId appId2 = ApplicationId.newInstance(0, 1); + String scId0 = "subcluster0"; + + this.registryClient.writeAMRMTokenForUAM(appId1, scId0, + new Token<AMRMTokenIdentifier>()); + this.registryClient.writeAMRMTokenForUAM(appId2, scId0, + new Token<AMRMTokenIdentifier>()); + Assert.assertEquals(2, this.registryClient.getAllApplications().size()); + + // Create a new client instance + this.registryClient = + new FederationRegistryClient(this.conf, this.registry, this.user); + + this.registryClient.loadStateFromRegistry(appId2); + // Should remove app2 + this.registryClient.removeAppFromRegistry(appId2, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should not remove app1 since memory state don't have it + this.registryClient.removeAppFromRegistry(appId1, false); + Assert.assertEquals(1, this.registryClient.getAllApplications().size()); + + // Should remove app1 + this.registryClient.removeAppFromRegistry(appId1, true); + Assert.assertEquals(0, this.registryClient.getAllApplications().size()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java index 6b0a5a4..e54244d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -32,4 +33,8 @@ public interface GPGContext { GPGPolicyFacade getPolicyFacade(); void setPolicyFacade(GPGPolicyFacade facade); + + FederationRegistryClient getRegistryClient(); + + void setRegistryClient(FederationRegistryClient client); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java index bb49844..b14f502 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; /** @@ -27,6 +28,7 @@ public class GPGContextImpl implements GPGContext { private FederationStateStoreFacade facade; private GPGPolicyFacade policyFacade; + private FederationRegistryClient registryClient; @Override public FederationStateStoreFacade getStateStoreFacade() { @@ -48,4 +50,14 @@ public class GPGContextImpl implements GPGContext { public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){ policyFacade = gpgPolicyfacade; } + + @Override + public FederationRegistryClient getRegistryClient() { + return registryClient; + } + + @Override + public void setRegistryClient(FederationRegistryClient client) { + registryClient = client; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index c8ec4cd..4444e4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -25,11 +25,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; @@ -60,6 +63,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Federation Variables private GPGContext gpgContext; + private RegistryOperations registry; // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; @@ -81,6 +85,17 @@ public class GlobalPolicyGenerator extends CompositeService { .setPolicyFacade(new GPGPolicyFacade( this.gpgContext.getStateStoreFacade(), conf)); + this.registry = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.YARN_REGISTRY_CLASS, + YarnConfiguration.DEFAULT_YARN_REGISTRY_CLASS, + RegistryOperations.class); + this.registry.init(conf); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + FederationRegistryClient registryClient = + new FederationRegistryClient(conf, this.registry, user); + this.gpgContext.setRegistryClient(registryClient); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); @@ -105,6 +120,8 @@ public class GlobalPolicyGenerator extends CompositeService { protected void serviceStart() throws Exception { super.serviceStart(); + this.registry.start(); + // Schedule SubClusterCleaner service long scCleanerIntervalMs = getConfig().getLong( YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, @@ -141,6 +158,10 @@ public class GlobalPolicyGenerator extends CompositeService { @Override protected void serviceStop() throws Exception { + if (this.registry != null) { + this.registry.stop(); + } + try { if (this.scheduledExecutorService != null && !this.scheduledExecutorService.isShutdown()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java index 85047ef..86eb536 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.commons.lang3.time.DurationFormatUtils; @@ -27,9 +28,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -46,6 +49,7 @@ public abstract class ApplicationCleaner implements Runnable { private Configuration conf; private GPGContext gpgContext; + private FederationRegistryClient registryClient; private int minRouterSuccessCount; private int maxRouterRetry; @@ -56,6 +60,7 @@ public abstract class ApplicationCleaner implements Runnable { this.gpgContext = context; this.conf = config; + this.registryClient = context.getRegistryClient(); String routerSpecString = this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, @@ -102,7 +107,7 @@ public abstract class ApplicationCleaner implements Runnable { LOG.info(String.format("Contacting router at: %s", webAppAddress)); AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf, - webAppAddress, "apps", AppsInfo.class, + webAppAddress, RMWSConsts.APPS, AppsInfo.class, DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); Set<ApplicationId> appSet = new HashSet<ApplicationId>(); @@ -149,6 +154,18 @@ public abstract class ApplicationCleaner implements Runnable { + " success Router queries after " + totalAttemptCount + " retries"); } + protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) { + List<String> allApps = this.registryClient.getAllApplications(); + LOG.info("Got " + allApps.size() + " existing apps in registry"); + for (String app : allApps) { + ApplicationId appId = ApplicationId.fromString(app); + if (!knownApps.contains(appId)) { + LOG.info("removing finished application entry for {}", app); + this.registryClient.removeAppFromRegistry(appId, true); + } + } + } + @Override public abstract void run(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java index 1ce9840..3c67638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -74,6 +74,8 @@ public class DefaultApplicationCleaner extends ApplicationCleaner { } } + // Clean up registry entries + cleanupAppRecordInRegistry(routerApps); } catch (Throwable e) { LOG.error("Application cleaner started at time " + now + " fails: ", e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java index ec3f64e..10d442b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -24,15 +24,21 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; @@ -50,6 +56,8 @@ public class TestDefaultApplicationCleaner { private FederationStateStoreFacade facade; private ApplicationCleaner appCleaner; private GPGContext gpgContext; + private RegistryOperations registry; + private FederationRegistryClient registryClient; private List<ApplicationId> appIds; // The list of applications returned by mocked router @@ -68,8 +76,18 @@ public class TestDefaultApplicationCleaner { facade = FederationStateStoreFacade.getInstance(); facade.reinitialize(stateStore, conf); + registry = new FSRegistryOperationsService(); + registry.init(conf); + registry.start(); + + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + registryClient = new FederationRegistryClient(conf, registry, user); + registryClient.cleanAllApplications(); + Assert.assertEquals(0, registryClient.getAllApplications().size()); + gpgContext = new GPGContextImpl(); gpgContext.setStateStoreFacade(facade); + gpgContext.setRegistryClient(registryClient); appCleaner = new TestableDefaultApplicationCleaner(); appCleaner.init(conf, gpgContext); @@ -87,7 +105,12 @@ public class TestDefaultApplicationCleaner { stateStore.addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest.newInstance( ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + + // Write some registry entries for the app + registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(), + new Token<AMRMTokenIdentifier>()); } + Assert.assertEquals(3, registryClient.getAllApplications().size()); } @After @@ -95,6 +118,12 @@ public class TestDefaultApplicationCleaner { if (stateStore != null) { stateStore.close(); } + if (registryClient != null) { + registryClient.cleanAllApplications(); + } + if (registry != null) { + registry.stop(); + } } @Test @@ -115,6 +144,9 @@ public class TestDefaultApplicationCleaner { .getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest.newInstance()) .getAppsHomeSubClusters().size()); + + // The known app should not be cleaned in registry + Assert.assertEquals(1, registryClient.getAllApplications().size()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ca57be3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 1bf882f..e527676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -724,13 +725,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (failedToUnRegister) { homeResponse.setIsUnregistered(false); - } else { + } else if (request.getFinalApplicationStatus() + .equals(FinalApplicationStatus.SUCCEEDED)) { // Clean up UAMs only when the app finishes successfully, so that no more // attempt will be launched. this.uamPool.stop(); if (this.registryClient != null) { this.registryClient - .removeAppFromRegistry(this.attemptId.getApplicationId()); + .removeAppFromRegistry(this.attemptId.getApplicationId(), false); } } return homeResponse; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
