Repository: hadoop Updated Branches: refs/heads/branch-2 3e3733437 -> 53bddc410
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 70a8f55..7b084cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -61,6 +61,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -166,9 +168,11 @@ public class TestNodeStatusUpdater { private class MyResourceTracker implements ResourceTracker { private final Context context; + private boolean signalContainer; - public MyResourceTracker(Context context) { + public MyResourceTracker(Context context, boolean signalContainer) { this.context = context; + this.signalContainer = signalContainer; } @Override @@ -222,17 +226,19 @@ public class TestNodeStatusUpdater { nodeStatus.setResponseId(heartBeatID++); Map<ApplicationId, List<ContainerStatus>> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); + List<SignalContainerRequest> containersToSignal = null; ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId2 = ApplicationId.newInstance(0, 2); + ContainerId firstContainerID = null; if (heartBeatID == 1) { Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); // Give a container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId1, 0); - ContainerId firstContainerID = + firstContainerID = ContainerId.newContainerId(appAttemptID, heartBeatID); ContainerLaunchContext launchContext = recordFactory .newRecordInstance(ContainerLaunchContext.class); @@ -259,6 +265,15 @@ public class TestNodeStatusUpdater { this.context.getContainers(); Assert.assertEquals(1, activeContainers.size()); + if (this.signalContainer) { + containersToSignal = new ArrayList<SignalContainerRequest>(); + SignalContainerRequest signalReq = recordFactory + .newRecordInstance(SignalContainerRequest.class); + signalReq.setContainerId(firstContainerID); + signalReq.setCommand(SignalContainerCommand.OUTPUT_THREAD_DUMP); + containersToSignal.add(signalReq); + } + // Give another container to the NM. ApplicationAttemptId appAttemptID = ApplicationAttemptId.newInstance(appId2, 0); @@ -295,6 +310,9 @@ public class TestNodeStatusUpdater { NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null, 1000L); + if (containersToSignal != null) { + nhResponse.addAllContainersToSignal(containersToSignal); + } return nhResponse; } @@ -306,15 +324,40 @@ public class TestNodeStatusUpdater { } } + private class MyContainerManager extends ContainerManagerImpl { + public boolean signaled = false; + + public MyContainerManager(Context context, ContainerExecutor exec, + DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, + NodeManagerMetrics metrics, + LocalDirsHandlerService dirsHandler) { + super(context, exec, deletionContext, nodeStatusUpdater, + metrics, dirsHandler); + } + + @Override + public void handle(ContainerManagerEvent event) { + if (event.getType() == ContainerManagerEventType.SIGNAL_CONTAINERS) { + signaled = true; + } + } + } + private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + this(context, dispatcher, healthChecker, metrics, false); + } + + public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, + boolean signalContainer) { super(context, dispatcher, healthChecker, metrics); this.context = context; - resourceTracker = new MyResourceTracker(this.context); + resourceTracker = new MyResourceTracker(this.context, signalContainer); } @Override @@ -1547,6 +1590,66 @@ public class TestNodeStatusUpdater { nm.stop(); } + + //Verify that signalContainer request can be dispatched from + //NodeStatusUpdaterImpl to ContainerManagerImpl. + @Test + public void testSignalContainerToContainerManager() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new MyNodeStatusUpdater( + context, dispatcher, healthChecker, metrics, true); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService diskhandler) { + return new MyContainerManager(context, exec, del, nodeStatusUpdater, + metrics, diskhandler); + } + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + System.out.println(" ----- thread already started.." + + nm.getServiceState()); + + int waitCount = 0; + while (nm.getServiceState() == STATE.INITED && waitCount++ != 20) { + LOG.info("Waiting for NM to start.."); + if (nmStartError != null) { + LOG.error("Error during startup. ", nmStartError); + Assert.fail(nmStartError.getCause().getMessage()); + } + Thread.sleep(1000); + } + if (nm.getServiceState() != STATE.STARTED) { + // NM could have failed. + Assert.fail("NodeManager failed to start"); + } + + waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + Assert.assertFalse(heartBeatID <= 3); + Assert.assertEquals("Number of registered NMs is wrong!!", 1, + this.registeredNodes.size()); + + MyContainerManager containerManager = + (MyContainerManager)nm.getContainerManager(); + Assert.assertTrue(containerManager.signaled); + + nm.stop(); + } + @Test public void testConcurrentAccessToSystemCredentials(){ final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f482784..0069aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -82,6 +82,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; @@ -464,4 +466,10 @@ public class MockResourceManagerFacade implements IOException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws IOException { +return null; +} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 3938342..532944b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -73,6 +73,8 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.junit.After; import org.junit.Before; +import static org.mockito.Mockito.spy; + public abstract class BaseContainerManagerTest { protected static RecordFactory recordFactory = RecordFactoryProvider @@ -148,7 +150,7 @@ public abstract class BaseContainerManagerTest { protected ContainerExecutor createContainerExecutor() { DefaultContainerExecutor exec = new DefaultContainerExecutor(); exec.setConf(conf); - return exec; + return spy(exec); } @Before http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 3fb4112..3f5fc82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequ import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SerializedException; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -75,22 +77,30 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; public class TestContainerManager extends BaseContainerManagerTest { @@ -262,7 +272,7 @@ public class TestContainerManager extends BaseContainerManagerTest { Assert.assertEquals(null, reader.readLine()); } - @Test + //@Test public void testContainerLaunchAndStop() throws IOException, InterruptedException, YarnException { containerManager.start(); @@ -1173,4 +1183,103 @@ public class TestContainerManager extends BaseContainerManagerTest { .retrievePassword(containerTokenIdentifier), containerTokenIdentifier); } + + @Test + public void testOutputThreadDumpSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP); + } + + @Test + public void testGracefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN); + } + + @Test + public void testForcefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN); + } + + // Verify signal container request can be delivered from + // NodeStatusUpdaterImpl to ContainerExecutor. + private void testContainerLaunchAndSignal(SignalContainerCommand command) + throws IOException, InterruptedException, YarnException { + + Signal signal = ContainerLaunch.translateCommandToSignal(command); + containerManager.start(); + + File scriptFile = new File(tmpDir, "scriptFile.sh"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File processStartFile = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + fileWriter.write("\numask 0"); // So that start file is readable by the test + fileWriter.write("\necho Hello World! > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 1000s"); + fileWriter.close(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map<String, LocalResource> localResources = + new HashMap<String, LocalResource>(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List<String> commands = new ArrayList<String>(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), + user, context.getContainerTokenSecretManager())); + List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFile.exists()); + + // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent + SignalContainerRequest signalReq = + SignalContainerRequest.newInstance(cId, command); + List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>(); + reqs.add(signalReq); + containerManager.handle(new CMgrSignalContainersEvent(reqs)); + + final ArgumentCaptor<ContainerSignalContext> signalContextCaptor = + ArgumentCaptor.forClass(ContainerSignalContext.class); + if (signal.equals(Signal.NULL)) { + verify(exec, never()).signalContainer(signalContextCaptor.capture()); + } else { + verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture()); + ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0); + Assert.assertEquals(cId, signalContext.getContainer().getContainerId()); + Assert.assertEquals(signal, signalContext.getSignal()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 0148234..0f8509d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -93,10 +93,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -139,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -1391,4 +1394,66 @@ public class ClientRMService extends AbstractService implements return response; } + /** + * Signal a container. + * After the request passes some sanity check, it will be delivered + * to RMNodeImpl so that the next NM heartbeat will pick up the signal request + */ + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + ContainerId containerId = request.getContainerId(); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + throw RPCUtil.getRemoteException(ie); + } + + ApplicationId applicationId = containerId.getApplicationAttemptId(). + getApplicationId(); + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + + RMContainer container = scheduler.getRMContainer(containerId); + if (container != null) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeSignalContainerEvent(container.getContainer().getNodeId(), + request)); + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId, + containerId); + } else { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", + "Trying to signal an absent container", applicationId, containerId); + throw RPCUtil + .getRemoteException("Trying to signal an absent container " + + containerId); + } + + return recordFactory + .newRecordInstance(SignalContainerResponse.class); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index cd9a61d..92745b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -58,6 +58,7 @@ public class RMAuditLogger { "Update Application Priority Request"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; + public static final String SIGNAL_CONTAINER = "Signal Container Request"; // Some commonly used descriptions public static final String UNAUTHORIZED_USER = "Unauthorized user"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index abe8544..b28fef3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -44,6 +44,9 @@ public enum RMNodeEventType { CLEANUP_CONTAINER, DECREASE_CONTAINER, + // Source: ClientRMService + SIGNAL_CONTAINER, + // Source: RMAppAttempt FINISHED_CONTAINERS_PULLED_BY_AM, http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 33e4714..e0d27d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -122,6 +123,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( new ContainerIdComparator()); + /* set of containers that need to be signaled */ + private final List<SignalContainerRequest> containersToSignal = + new ArrayList<SignalContainerRequest>(); + /* * set of containers to notify NM to remove them from its context. Currently, * this includes containers that were notified to AM about their completion @@ -194,6 +199,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.DECREASE_CONTAINER, new DecreaseContainersTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, RMNodeEventType.SHUTDOWN, new DeactivateNodeTransition(NodeState.SHUTDOWN)) @@ -288,6 +295,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, new AddContainersToBeRemovedFromNMTransition()) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN, RMNodeEventType.SHUTDOWN, new DeactivateNodeTransition(NodeState.SHUTDOWN)) @@ -491,8 +500,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { response.addAllApplicationsToCleanup(this.finishedApplications); response.addContainersToBeRemovedFromNM( new ArrayList<ContainerId>(this.containersToBeRemovedFromNM)); + response.addAllContainersToSignal(this.containersToSignal); this.containersToClean.clear(); this.finishedApplications.clear(); + this.containersToSignal.clear(); this.containersToBeRemovedFromNM.clear(); } finally { this.writeLock.unlock(); @@ -1090,6 +1101,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } + public static class SignalContainerTransition implements + SingleArcTransition<RMNodeImpl, RMNodeEvent> { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.containersToSignal.add((( + RMNodeSignalContainerEvent) event).getSignalRequest()); + } + } + @Override public List<UpdatedContainerInfo> pullContainerUpdates() { List<UpdatedContainerInfo> latestContainerInfoList = http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java new file mode 100644 index 0000000..098c07a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeSignalContainerEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class RMNodeSignalContainerEvent extends RMNodeEvent { + + private SignalContainerRequest signalRequest; + + public RMNodeSignalContainerEvent(NodeId nodeId, + SignalContainerRequest signalRequest) { + super(nodeId, RMNodeEventType.SIGNAL_CONTAINER); + this.signalRequest = signalRequest; + } + + public SignalContainerRequest getSignalRequest() { + return this.signalRequest; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a066ba4..674529e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; @@ -814,4 +816,12 @@ public class MockRM extends ResourceManager { public RMActiveServices getRMActiveService() { return activeServices; } + + public void signalContainer(ContainerId containerId, SignalContainerCommand command) + throws Exception { + ApplicationClientProtocol client = getClientRMService(); + SignalContainerRequest req = + SignalContainerRequest.newInstance(containerId, command); + client.signalContainer(req); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53bddc41/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java new file mode 100644 index 0000000..16cb866 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestSignalContainer.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Test; + +public class TestSignalContainer { + + private static final Log LOG = LogFactory + .getLog(TestSignalContainer.class); + + @Test + public void testSignalRequestDeliveryToNM() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + MockRM rm = new MockRM(); + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + //request for containers + final int request = 2; + am.allocate("h1" , 1000, request, new ArrayList<ContainerId>()); + + //kick the scheduler + nm1.nodeHeartbeat(true); + List<Container> conts = null; + int contReceived = 0; + int waitCount = 0; + while (contReceived < request && waitCount++ < 200) { + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); + Thread.sleep(100); + conts = am.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()).getAllocatedContainers(); + contReceived += conts.size(); + } + Assert.assertEquals(request, contReceived); + + for(Container container : conts) { + rm.signalContainer(container.getId(), + SignalContainerCommand.OUTPUT_THREAD_DUMP); + } + + NodeHeartbeatResponse resp; + List<SignalContainerRequest> contsToSignal; + int signaledConts = 0; + + waitCount = 0; + while ( signaledConts < request && waitCount++ < 200) { + LOG.info("Waiting to get signalcontainer events.. signaledConts: " + + signaledConts); + resp = nm1.nodeHeartbeat(true); + contsToSignal = resp.getContainersToSignalList(); + signaledConts += contsToSignal.size(); + Thread.sleep(100); + } + + // Verify NM receives the expected number of signal container requests. + Assert.assertEquals(request, signaledConts); + + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + rm.stop(); + } +} \ No newline at end of file
