YARN-8142. Improve SIGTERM handling for YARN Service Application Master.
Contributed by Billie Rinaldi
Advertising
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9031a76d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9031a76d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9031a76d
Branch: refs/heads/HDFS-7240
Commit: 9031a76d447f0c5eaa392144fd17c5b9812e1b20
Parents: e66e287
Author: Eric Yang <ey...@apache.org>
Authored: Fri Apr 13 15:34:33 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Fri Apr 13 15:34:33 2018 -0400
----------------------------------------------------------------------
.../hadoop/yarn/service/ClientAMService.java | 1 +
.../hadoop/yarn/service/ServiceScheduler.java | 41 +++++++----
.../hadoop/yarn/service/ServiceTestUtils.java | 11 +++
.../yarn/service/TestYarnNativeServices.java | 71 ++++++++++++++++++++
4 files changed, 110 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9031a76d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 08c36f4..3d037e7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -125,6 +125,7 @@ public class ClientAMService extends AbstractService
LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
context.scheduler.getDiagnostics()
.append("Stopped by user " + UserGroupInformation.getCurrentUser());
+ context.scheduler.setGracefulStop();
// Stop the service in 2 seconds delay to make sure this rpc call is
completed.
// shutdown hook will be executed which will stop AM gracefully.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9031a76d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 0fcca16..7eddef9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -156,6 +156,8 @@ public class ServiceScheduler extends CompositeService {
// requests for a single service is not recommended.
private boolean hasAtLeastOnePlacementConstraint;
+ private boolean gracefulStop = false;
+
public ServiceScheduler(ServiceContext context) {
super(context.service.getName());
this.context = context;
@@ -199,6 +201,7 @@ public class ServiceScheduler extends CompositeService {
addIfService(amRMClient);
nmClient = createNMClient();
+ nmClient.getClient().cleanupRunningContainersOnStop(false);
addIfService(nmClient);
dispatcher = new AsyncDispatcher("Component dispatcher");
@@ -252,6 +255,11 @@ public class ServiceScheduler extends CompositeService {
.createAMRMClientAsync(1000, new AMRMClientCallback());
}
+ protected void setGracefulStop() {
+ this.gracefulStop = true;
+ nmClient.getClient().cleanupRunningContainersOnStop(true);
+ }
+
@Override
public void serviceInit(Configuration conf) throws Exception {
try {
@@ -266,26 +274,31 @@ public class ServiceScheduler extends CompositeService {
public void serviceStop() throws Exception {
LOG.info("Stopping service scheduler");
- // Mark component-instances/containers as STOPPED
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- for (ContainerId containerId : getLiveInstances().keySet()) {
- serviceTimelinePublisher.componentInstanceFinished(containerId,
- KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
- }
- }
if (executorService != null) {
executorService.shutdownNow();
}
DefaultMetricsSystem.shutdown();
- if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
- serviceTimelinePublisher
- .serviceAttemptUnregistered(context, diagnostics.toString());
+
+ // only stop the entire service when a graceful stop has been initiated
+ // (e.g. via client RPC, not through the AM receiving a SIGTERM)
+ if (gracefulStop) {
+ if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+ // mark component-instances/containers as STOPPED
+ for (ContainerId containerId : getLiveInstances().keySet()) {
+ serviceTimelinePublisher.componentInstanceFinished(containerId,
+ KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
+ }
+ // mark attempt as unregistered
+ serviceTimelinePublisher
+ .serviceAttemptUnregistered(context, diagnostics.toString());
+ }
+ // unregister AM
+ amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
+ diagnostics.toString(), "");
+ LOG.info("Service {} unregistered with RM, with attemptId = {} " +
+ ", diagnostics = {} ", app.getName(), context.attemptId,
diagnostics);
}
- amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
- diagnostics.toString(), "");
- LOG.info("Service {} unregistered with RM, with attemptId = {} " +
- ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
super.serviceStop();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9031a76d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index d84f05f..599b8a7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component;
@@ -305,6 +306,16 @@ public class ServiceTestUtils {
return client;
}
+ /**
+ * Creates a YarnClient for test purposes.
+ */
+ public static YarnClient createYarnClient(Configuration conf) {
+ YarnClient client = YarnClient.createYarnClient();
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
protected CuratorService getCuratorService() throws IOException {
return curatorService;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9031a76d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
----------------------------------------------------------------------
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
index 5e267bb..443ba0b 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -489,6 +490,76 @@ public class TestYarnNativeServices extends
ServiceTestUtils {
client.actionDestroy(exampleApp.getName());
}
+ @Test(timeout = 200000)
+ public void testAMSigtermDoesNotKillApplication() throws Exception {
+ runAMSignalTest(SignalContainerCommand.GRACEFUL_SHUTDOWN);
+ }
+
+ @Test(timeout = 200000)
+ public void testAMSigkillDoesNotKillApplication() throws Exception {
+ runAMSignalTest(SignalContainerCommand.FORCEFUL_SHUTDOWN);
+ }
+
+ public void runAMSignalTest(SignalContainerCommand signal) throws Exception {
+ setupInternal(NUM_NMS);
+ ServiceClient client = createClient(getConf());
+ Service exampleApp = createExampleApplication();
+ client.actionCreate(exampleApp);
+ waitForServiceToBeStable(client, exampleApp);
+ Service appStatus1 = client.getStatus(exampleApp.getName());
+ ApplicationId exampleAppId = ApplicationId.fromString(appStatus1.getId());
+
+ YarnClient yarnClient = createYarnClient(getConf());
+ ApplicationReport applicationReport = yarnClient.getApplicationReport(
+ exampleAppId);
+
+ ApplicationAttemptId firstAttemptId = applicationReport
+ .getCurrentApplicationAttemptId();
+ ApplicationAttemptReport attemptReport = yarnClient
+ .getApplicationAttemptReport(firstAttemptId);
+
+ // the AM should not perform a graceful shutdown since the operation was
not
+ // initiated through the service client
+ yarnClient.signalToContainer(attemptReport.getAMContainerId(), signal);
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ApplicationReport ar = client.getYarnClient()
+ .getApplicationReport(exampleAppId);
+ YarnApplicationState state = ar.getYarnApplicationState();
+ Assert.assertTrue(state == YarnApplicationState.RUNNING ||
+ state == YarnApplicationState.ACCEPTED);
+ if (state != YarnApplicationState.RUNNING) {
+ return false;
+ }
+ if (ar.getCurrentApplicationAttemptId() == null ||
+ ar.getCurrentApplicationAttemptId().equals(firstAttemptId)) {
+ return false;
+ }
+ Service appStatus2 = client.getStatus(exampleApp.getName());
+ if (appStatus2.getState() != ServiceState.STABLE) {
+ return false;
+ }
+ Assert.assertEquals(getSortedContainerIds(appStatus1).toString(),
+ getSortedContainerIds(appStatus2).toString());
+ return true;
+ } catch (YarnException | IOException e) {
+ throw new RuntimeException("while waiting", e);
+ }
+ }, 2000, 200000);
+ }
+
+ private static List<String> getSortedContainerIds(Service s) {
+ List<String> containerIds = new ArrayList<>();
+ for (Component component : s.getComponents()) {
+ for (Container container : component.getContainers()) {
+ containerIds.add(container.getId());
+ }
+ }
+ Collections.sort(containerIds);
+ return containerIds;
+ }
+
// Check containers launched are in dependency order
// Get all containers into a list and sort based on container launch time
e.g.
// compa-c1, compa-c2, compb-c1, compb-c2;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org