YARN-1897. CLI and core support for signal container functionality. Contributed by Ming Ma
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8f08532b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8f08532b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8f08532b Branch: refs/heads/trunk Commit: 8f08532bde153811368e1b8336446fba4743f9d2 Parents: fdf02d1 Author: Xuan <[email protected]> Authored: Fri Oct 2 18:50:47 2015 -0700 Committer: Xuan <[email protected]> Committed: Fri Oct 2 18:50:47 2015 -0700 ---------------------------------------------------------------------- .../hadoop/mapred/ResourceMgrDelegate.java | 7 + .../hadoop/mapred/TestClientRedirect.java | 8 + hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/api/ApplicationClientProtocol.java | 30 ++++ .../protocolrecords/SignalContainerRequest.java | 78 +++++++++ .../SignalContainerResponse.java | 36 ++++ .../api/records/SignalContainerCommand.java | 45 +++++ .../main/proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_protos.proto | 7 + .../src/main/proto/yarn_service_protos.proto | 8 + .../hadoop/yarn/client/api/YarnClient.java | 17 +- .../yarn/client/api/impl/YarnClientImpl.java | 13 +- .../hadoop/yarn/client/cli/ApplicationCLI.java | 35 ++++ .../apache/hadoop/yarn/client/cli/YarnCLI.java | 1 + .../yarn/client/api/impl/TestYarnClient.java | 25 +++ .../hadoop/yarn/client/cli/TestYarnCLI.java | 30 +++- .../ApplicationClientProtocolPBClientImpl.java | 18 ++ .../ApplicationClientProtocolPBServiceImpl.java | 19 +++ .../impl/pb/SignalContainerRequestPBImpl.java | 169 +++++++++++++++++++ .../impl/pb/SignalContainerResponsePBImpl.java | 44 +++++ .../protocolrecords/NodeHeartbeatResponse.java | 3 + .../impl/pb/NodeHeartbeatResponsePBImpl.java | 78 ++++++++- .../yarn_server_common_service_protos.proto | 2 + .../nodemanager/CMgrSignalContainersEvent.java | 37 ++++ .../nodemanager/ContainerManagerEventType.java | 3 +- .../nodemanager/NodeStatusUpdaterImpl.java | 11 ++ .../containermanager/ContainerManagerImpl.java | 20 +++ .../launcher/ContainerLaunch.java | 95 +++++++++++ .../launcher/ContainersLauncher.java | 18 +- .../launcher/ContainersLauncherEventType.java | 1 + .../launcher/SignalContainersLauncherEvent.java | 38 +++++ .../TestContainerManagerWithLCE.java | 36 ++++ .../nodemanager/TestNodeStatusUpdater.java | 109 +++++++++++- .../amrmproxy/MockResourceManagerFacade.java | 8 + .../BaseContainerManagerTest.java | 4 +- .../containermanager/TestContainerManager.java | 111 +++++++++++- .../server/resourcemanager/ClientRMService.java | 69 +++++++- .../server/resourcemanager/RMAuditLogger.java | 1 + .../resourcemanager/rmnode/RMNodeEventType.java | 3 + .../resourcemanager/rmnode/RMNodeImpl.java | 21 +++ .../rmnode/RMNodeSignalContainerEvent.java | 38 +++++ .../yarn/server/resourcemanager/MockRM.java | 10 ++ .../resourcemanager/TestSignalContainer.java | 113 +++++++++++++ 43 files changed, 1403 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 91c3086..29266d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.ClientRMProxy; @@ -473,4 +474,10 @@ public class ResourceMgrDelegate extends YarnClient { Priority priority) throws YarnException, IOException { client.updateApplicationPriority(applicationId, priority); } + + @Override + public void signalContainer(ContainerId containerId, SignalContainerCommand command) + throws YarnException, IOException { + client.signalContainer(containerId, command); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 1bf1408..8febec6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -112,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; @@ -453,6 +455,12 @@ public class TestClientRedirect { IOException { return null; } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9e9522c..bd38c2d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -222,6 +222,9 @@ Release 2.8.0 - UNRELEASED YARN-1651. CapacityScheduler side changes to support container resize. (Wangda Tan via jianhe) + + YARN-1897. CLI and core support for signal container functionality. + (Ming Ma via xgong) IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 08fc289..bcd3ef6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -53,6 +53,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -437,4 +439,32 @@ public interface ApplicationClientProtocol extends ApplicationBaseProtocol { public UpdateApplicationPriorityResponse updateApplicationPriority( UpdateApplicationPriorityRequest request) throws YarnException, IOException; + + /** + * <p>The interface used by clients to request the + * <code>ResourceManager</code> to signal a container. For example, + * the client can send command OUTPUT_THREAD_DUMP to dump threads of the + * container.</p> + * + * <p>The client, via {@link SignalContainerRequest} provides the + * id of the container and the signal command. </p> + * + * <p> In secure mode,the <code>ResourceManager</code> verifies access to the + * application before signaling the container. + * The user needs to have <code>MODIFY_APP</code> permission.</p> + * + * <p>Currently, the <code>ResourceManager</code> returns an empty response + * on success and throws an exception on rejecting the request.</p> + * + * @param request request to signal a container + * @return <code>ResourceManager</code> returns an empty response + * on success and throws an exception on rejecting the request + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, + IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java new file mode 100644 index 0000000..2a3861a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerRequest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p>The request sent by the client to the <code>ResourceManager</code> + * or by the <code>ApplicationMaster</code> to the <code>NodeManager</code> + * to signal a container. + * @see SignalContainerCommand </p> + */ +@Public +@Evolving +public abstract class SignalContainerRequest { + + @Public + @Unstable + public static SignalContainerRequest newInstance(ContainerId containerId, + SignalContainerCommand signalContainerCommand) { + SignalContainerRequest request = + Records.newRecord(SignalContainerRequest.class); + request.setContainerId(containerId); + request.setCommand(signalContainerCommand); + return request; + } + + /** + * Get the <code>ContainerId</code> of the container to signal. + * @return <code>ContainerId</code> of the container to signal. + */ + @Public + @Unstable + public abstract ContainerId getContainerId(); + + /** + * Set the <code>ContainerId</code> of the container to signal. + */ + @Public + @Unstable + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the <code>SignalContainerCommand</code> of the signal request. + * @return <code>SignalContainerCommand</code> of the signal request. + */ + @Public + @Unstable + public abstract SignalContainerCommand getCommand(); + + /** + * Set the <code>SignalContainerCommand</code> of the signal request. + */ + @Public + @Unstable + public abstract void setCommand(SignalContainerCommand command); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java new file mode 100644 index 0000000..0d773b9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SignalContainerResponse.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + * <p>The response sent by the <code>ResourceManager</code> to the client + * signalling a container.</p> + * + * <p>Currently it's empty.</p> + * + * @see ApplicationClientProtocol#signalContainer(SignalContainerRequest) + */ +@Public +@Evolving +public abstract class SignalContainerResponse { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java new file mode 100644 index 0000000..7be6dbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SignalContainerCommand.java @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Enumeration of various signal container commands. + */ +@Public +@Evolving +public enum SignalContainerCommand { + /** + * Used to capture thread dump. + * On Linux, it is equivalent to SIGQUIT. + */ + OUTPUT_THREAD_DUMP, + + /** Gracefully shutdown a container. + * On Linux, it is equivalent to SIGTERM. + */ + GRACEFUL_SHUTDOWN, + + /** Forcefully shutdown a container. + * On Linux, it is equivalent to SIGKILL. + */ + FORCEFUL_SHUTDOWN, +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index 117c930..a2ab9c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -56,4 +56,5 @@ service ApplicationClientProtocolService { rpc getLabelsToNodes (GetLabelsToNodesRequestProto) returns (GetLabelsToNodesResponseProto); rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); + rpc signalContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 057aeee..687ee89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -417,6 +417,13 @@ message QueueUserACLInfoProto { repeated QueueACLProto userAcls = 2; } +enum SignalContainerCommandProto { + OUTPUT_THREAD_DUMP = 1; + GRACEFUL_SHUTDOWN = 2; + FORCEFUL_SHUTDOWN = 3; +} + + //////////////////////////////////////////////////////////////////////// ////// From reservation_protocol ///////////////////////////////////// //////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index ff5a127..15e99f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -225,6 +225,14 @@ message UpdateApplicationPriorityRequestProto { message UpdateApplicationPriorityResponseProto { } +message SignalContainerRequestProto { + required ContainerIdProto container_id = 1; + required SignalContainerCommandProto command = 2; +} + +message SignalContainerResponseProto { +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index ff90da1..7f6a9fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -38,8 +38,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -56,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -683,4 +682,18 @@ public abstract class YarnClient extends AbstractService { @Unstable public abstract void updateApplicationPriority(ApplicationId applicationId, Priority priority) throws YarnException, IOException; + + /** + * <p> + * Signal a container identified by given ID. + * </p> + * + * @param containerId + * {@link ContainerId} of the container that needs to be signaled + * @param command the signal container command + * @throws YarnException + * @throws IOException + */ + public abstract void signalContainer(ContainerId containerId, + SignalContainerCommand command) throws YarnException, IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index bc97a12..2bc6143 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -76,9 +76,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -94,6 +94,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -826,4 +827,14 @@ public class YarnClientImpl extends YarnClient { UpdateApplicationPriorityRequest.newInstance(applicationId, priority); rmClient.updateApplicationPriority(request); } + + @Override + public void signalContainer(ContainerId containerId, + SignalContainerCommand command) + throws YarnException, IOException { + LOG.info("Signalling container " + containerId + " with command " + command); + SignalContainerRequest request = + SignalContainerRequest.newInstance(containerId, command); + rmClient.signalContainer(request); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index 55692f1..be89ce2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -42,8 +42,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; @@ -148,6 +150,12 @@ public class ApplicationCLI extends YarnCLI { opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.getOption(STATUS_CMD).setArgName("Container ID"); opts.getOption(LIST_CMD).setArgName("Application Attempt ID"); + opts.addOption(SIGNAL_CMD, true, + "Signal the container. The available signal commands are " + + java.util.Arrays.asList(SignalContainerCommand.values()) + + " Default command is OUTPUT_THREAD_DUMP."); + opts.getOption(SIGNAL_CMD).setArgName("container ID [signal command]"); + opts.getOption(SIGNAL_CMD).setArgs(3); } int exitCode = -1; @@ -254,6 +262,19 @@ public class ApplicationCLI extends YarnCLI { } updateApplicationPriority(cliParser.getOptionValue(APP_ID), cliParser.getOptionValue(UPDATE_PRIORITY)); + } else if (cliParser.hasOption(SIGNAL_CMD)) { + if (args.length < 3 || args.length > 4) { + printUsage(title, opts); + return exitCode; + } + final String[] signalArgs = cliParser.getOptionValues(SIGNAL_CMD); + final String containerId = signalArgs[0]; + SignalContainerCommand command = + SignalContainerCommand.OUTPUT_THREAD_DUMP; + if (signalArgs.length == 2) { + command = SignalContainerCommand.valueOf(signalArgs[1]); + } + signalContainer(containerId, command); } else { syserr.println("Invalid Command Usage : "); printUsage(title, opts); @@ -262,6 +283,20 @@ public class ApplicationCLI extends YarnCLI { } /** + * Signals the containerId + * + * @param containerIdStr the container id + * @param command the signal command + * @throws YarnException + */ + private void signalContainer(String containerIdStr, + SignalContainerCommand command) throws YarnException, IOException { + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + sysout.println("Signalling container " + containerIdStr); + client.signalContainer(containerId, command); + } + + /** * It prints the usage of the command * * @param opts http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java index 26349fa..a0c0148 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java @@ -35,6 +35,7 @@ public abstract class YarnCLI extends Configured implements Tool { public static final String KILL_CMD = "kill"; public static final String MOVE_TO_QUEUE_CMD = "movetoqueue"; public static final String HELP_CMD = "help"; + public static final String SIGNAL_CMD = "signal"; protected PrintStream sysout; protected PrintStream syserr; protected YarnClient client; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index e584cf9..5c2f23f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.AHSClient; @@ -125,6 +127,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class TestYarnClient { @@ -1299,4 +1302,26 @@ public class TestYarnClient { } } } + + @Test + public void testSignalContainer() throws Exception { + Configuration conf = new Configuration(); + @SuppressWarnings("resource") + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + applicationId, 1); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + SignalContainerCommand command = SignalContainerCommand.OUTPUT_THREAD_DUMP; + client.signalContainer(containerId, command); + final ArgumentCaptor<SignalContainerRequest> signalReqCaptor = + ArgumentCaptor.forClass(SignalContainerRequest.class); + verify(((MockYarnClient) client).getRMClient()) + .signalContainer(signalReqCaptor.capture()); + SignalContainerRequest request = signalReqCaptor.getValue(); + Assert.assertEquals(containerId, request.getContainerId()); + Assert.assertEquals(command, request.getCommand()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index de50467..b72fd2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -32,6 +32,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.regex.Pattern; import org.apache.commons.cli.Options; import org.apache.commons.lang.time.DateFormatUtils; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -82,6 +85,8 @@ public class TestYarnCLI { private PrintStream sysOut; ByteArrayOutputStream sysErrStream; private PrintStream sysErr; + private static final Pattern SPACES_PATTERN = + Pattern.compile("\\s+|\\n+|\\t+"); @Before public void setup() { @@ -785,7 +790,7 @@ public class TestYarnCLI { Assert.assertTrue(result == 0); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); sysOutStream.reset(); ApplicationId applicationId = ApplicationId.newInstance(1234, 5); @@ -795,7 +800,7 @@ public class TestYarnCLI { new String[] {"container", "-list", appAttemptId.toString(), "args" }); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); sysOutStream.reset(); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 7); @@ -803,7 +808,7 @@ public class TestYarnCLI { new String[] { "container", "-status", containerId.toString(), "args" }); verify(spyCli).printUsage(any(String.class), any(Options.class)); Assert.assertEquals(createContainerCLIHelpMessage(), - sysOutStream.toString()); + normalize(sysOutStream.toString())); } @Test (timeout = 5000) @@ -1256,8 +1261,8 @@ public class TestYarnCLI { sysOutStream.reset(); result = cli.run(new String[] { "container", "-status" }); Assert.assertEquals(result, -1); - Assert.assertEquals(String.format("Missing argument for options%n%1s", - createContainerCLIHelpMessage()), sysOutStream.toString()); + Assert.assertEquals(String.format("Missing argument for options %1s", + createContainerCLIHelpMessage()), normalize(sysOutStream.toString())); sysOutStream.reset(); NodeCLI nodeCLI = new NodeCLI(); @@ -1538,10 +1543,17 @@ public class TestYarnCLI { pw.println("usage: container"); pw.println(" -help Displays help for all commands."); pw.println(" -list <Application Attempt ID> List containers for application attempt."); + pw.println(" -signal <container ID [signal command]> Signal the container."); + pw.println("The available signal commands are "); + pw.println(java.util.Arrays.asList(SignalContainerCommand.values())); + pw.println(" Default command is OUTPUT_THREAD_DUMP."); pw.println(" -status <Container ID> Prints the status of the container."); pw.close(); - String appsHelpStr = baos.toString("UTF-8"); - return appsHelpStr; + try { + return normalize(baos.toString("UTF-8")); + } catch (UnsupportedEncodingException infeasible) { + return infeasible.toString(); + } } private String createNodeCLIHelpMessage() throws IOException { @@ -1560,4 +1572,8 @@ public class TestYarnCLI { String nodesHelpStr = baos.toString("UTF-8"); return nodesHelpStr; } + + private static String normalize(String s) { + return SPACES_PATTERN.matcher(s).replaceAll(" "); // single space + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 9ccc326..8d7351d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -77,6 +77,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -125,6 +127,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateReque import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -527,4 +531,18 @@ public class ApplicationClientProtocolPBClientImpl implements ApplicationClientP return null; } } + + @Override + public SignalContainerResponse signalContainer( + SignalContainerRequest request) throws YarnException, IOException { + YarnServiceProtos.SignalContainerRequestProto requestProto = + ((SignalContainerRequestPBImpl) request).getProto(); + try { + return new SignalContainerResponsePBImpl( + proxy.signalContainer(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 6ca2136..b9485a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetApplicationAttemptReportRequestPBImpl; @@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPrior import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptReportResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetApplicationAttemptsRequestProto; @@ -140,8 +142,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionReque import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -529,4 +534,18 @@ public class ApplicationClientProtocolPBServiceImpl implements ApplicationClient throw new ServiceException(e); } } + + @Override + public SignalContainerResponseProto signalContainer(RpcController controller, + YarnServiceProtos.SignalContainerRequestProto proto) throws ServiceException { + SignalContainerRequestPBImpl request = new SignalContainerRequestPBImpl(proto); + try { + SignalContainerResponse response = real.signalContainer(request); + return ((SignalContainerResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java new file mode 100644 index 0000000..5618a7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerRequestPBImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SignalContainerCommandProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProtoOrBuilder; + + +public class SignalContainerRequestPBImpl + extends SignalContainerRequest { + SignalContainerRequestProto proto = + SignalContainerRequestProto.getDefaultInstance(); + SignalContainerRequestProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId; + private SignalContainerCommand command = null; + + private static SignalContainerCommand convertFromProtoFormat( + SignalContainerCommandProto p) { + return SignalContainerCommand.valueOf(p.name()); + } + + private static SignalContainerCommandProto convertToProtoFormat( + SignalContainerCommand p) { + return SignalContainerCommandProto.valueOf(p.name()); + } + + public SignalContainerRequestPBImpl() { + builder = SignalContainerRequestProto.newBuilder(); + } + + public SignalContainerRequestPBImpl(SignalContainerRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.command != null) { + builder.setCommand(convertToProtoFormat(this.command)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SignalContainerRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ContainerId getContainerId() { + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerId != null) { + return this.containerId; + } + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) { + builder.clearContainerId(); + } + this.containerId = containerId; + } + + private void initCommand() { + if (this.command != null) { + return; + } + SignalContainerRequestProtoOrBuilder p = viaProto ? proto : builder; + if(p.hasCommand()) { + this.command = convertFromProtoFormat(p.getCommand()); + } + } + + @Override + public SignalContainerCommand getCommand() { + initCommand(); + return command; + } + + @Override + public void setCommand(SignalContainerCommand command) { + maybeInitBuilder(); + if (command == null) { + builder.clearCommand(); + } + this.command = command; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl)t).getProto(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java new file mode 100644 index 0000000..b0aae14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SignalContainerResponsePBImpl.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; + +public class SignalContainerResponsePBImpl extends SignalContainerResponse { + SignalContainerResponseProto proto = SignalContainerResponseProto.getDefaultInstance(); + SignalContainerResponseProto.Builder builder = null; + boolean viaProto = false; + + public SignalContainerResponsePBImpl() { + builder = SignalContainerResponseProto.newBuilder(); + } + + public SignalContainerResponsePBImpl(SignalContainerResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SignalContainerResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index c0ccf57..f8a1320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -56,6 +57,8 @@ public interface NodeHeartbeatResponse { void addAllApplicationsToCleanup(List<ApplicationId> applications); + List<SignalContainerRequest> getContainersToSignalList(); + void addAllContainersToSignal(List<SignalContainerRequest> containers); long getNextHeartBeatInterval(); void setNextHeartBeatInterval(long nextHeartBeatInterval); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index dc65141..224e50b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; @@ -62,8 +65,8 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - private List<Container> containersToDecrease = null; + private List<SignalContainerRequest> containersToSignal = null; public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); @@ -105,6 +108,9 @@ public class NodeHeartbeatResponsePBImpl extends if (this.containersToDecrease != null) { addContainersToDecreaseToProto(); } + if (this.containersToSignal != null) { + addContainersToSignalToProto(); + } } private void addSystemCredentialsToProto() { @@ -571,5 +577,75 @@ public class NodeHeartbeatResponsePBImpl extends maybeInitBuilder(); this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM); } + + @Override + public List<SignalContainerRequest> getContainersToSignalList() { + initContainersToSignal(); + return this.containersToSignal; + } + + private void initContainersToSignal() { + if (this.containersToSignal != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List<SignalContainerRequestProto> list = p.getContainersToSignalList(); + this.containersToSignal = new ArrayList<SignalContainerRequest>(); + + for (SignalContainerRequestProto c : list) { + this.containersToSignal.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainersToSignal( + final List<SignalContainerRequest> containersToSignal) { + if (containersToSignal == null) + return; + initContainersToSignal(); + this.containersToSignal.addAll(containersToSignal); + } + + private void addContainersToSignalToProto() { + maybeInitBuilder(); + builder.clearContainersToSignal(); + if (containersToSignal == null) + return; + + Iterable<SignalContainerRequestProto> iterable = + new Iterable<SignalContainerRequestProto>() { + @Override + public Iterator<SignalContainerRequestProto> iterator() { + return new Iterator<SignalContainerRequestProto>() { + Iterator<SignalContainerRequest> iter = containersToSignal.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SignalContainerRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToSignal(iterable); + } + + private SignalContainerRequestPBImpl convertFromProtoFormat( + SignalContainerRequestProto p) { + return new SignalContainerRequestPBImpl(p); + } + + private SignalContainerRequestProto convertToProtoFormat( + SignalContainerRequest t) { + return ((SignalContainerRequestPBImpl)t).getProto(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 2db8919..a54bbdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -24,6 +24,7 @@ package hadoop.yarn; import "yarn_protos.proto"; import "yarn_server_common_protos.proto"; +import "yarn_service_protos.proto"; message NodeLabelsProto { repeated NodeLabelProto nodeLabels = 1; @@ -83,6 +84,7 @@ message NodeHeartbeatResponseProto { repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; + repeated SignalContainerRequestProto containers_to_signal = 13; } message SystemCredentialsForAppsProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java new file mode 100644 index 0000000..b0dc3af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrSignalContainersEvent.java @@ -0,0 +1,37 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager; + +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; + +public class CMgrSignalContainersEvent extends ContainerManagerEvent { + + private List<SignalContainerRequest> containerToSignal; + + public CMgrSignalContainersEvent(List<SignalContainerRequest> containerToSignal) { + super(ContainerManagerEventType.SIGNAL_CONTAINERS); + this.containerToSignal = containerToSignal; + } + + public List<SignalContainerRequest> getContainersToSignal() { + return this.containerToSignal; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java index fcb0252..8861bc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerManagerEventType.java @@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager; public enum ContainerManagerEventType { FINISH_APPS, FINISH_CONTAINERS, - DECREASE_CONTAINERS_RESOURCE + DECREASE_CONTAINERS_RESOURCE, + SIGNAL_CONTAINERS } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f8ce90f..1b186c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -800,6 +801,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new CMgrDecreaseContainersResourceEvent(containersToDecrease) ); } + + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to ContainerLauncher. + List<SignalContainerRequest> containersToSignal = response + .getContainersToSignalList(); + if (containersToSignal.size() != 0) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 39d2983..f44de59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -97,6 +98,7 @@ import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -121,6 +123,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; @@ -1349,6 +1352,23 @@ public class ContainerManagerImpl extends CompositeService implements } } break; + case SIGNAL_CONTAINERS: + CMgrSignalContainersEvent containersSignalEvent = + (CMgrSignalContainersEvent) event; + for (SignalContainerRequest request : containersSignalEvent + .getContainersToSignal()) { + ContainerId containerId = request.getContainerId(); + Container container = this.context.getContainers().get(containerId); + if (container != null) { + LOG.info(containerId + " signal request by ResourceManager."); + this.dispatcher.getEventHandler().handle( + new SignalContainersLauncherEvent(container, + request.getCommand())); + } else { + LOG.info("Container " + containerId + " no longer exists"); + } + } + break; default: throw new YarnRuntimeException( "Got an unknown ContainerManagerEvent type: " + event.getType()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index bf00d74..9718098 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.ipc.RPCUtil; @@ -462,6 +463,100 @@ public class ContainerLaunch implements Callable<Integer> { } /** + * Send a signal to the container. + * + * + * @throws IOException + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void signalContainer(SignalContainerCommand command) + throws IOException { + ContainerId containerId = + container.getContainerTokenIdentifier().getContainerID(); + String containerIdStr = ConverterUtils.toString(containerId); + String user = container.getUser(); + Signal signal = translateCommandToSignal(command); + if (signal.equals(Signal.NULL)) { + LOG.info("ignore signal command " + command); + return; + } + + LOG.info("Sending signal " + command + " to container " + containerIdStr); + + boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true); + if (!alreadyLaunched) { + LOG.info("Container " + containerIdStr + " not launched." + + " Not sending the signal"); + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Getting pid for container " + containerIdStr + + " to send signal to from pid file " + + (pidFilePath != null ? pidFilePath.toString() : "null")); + } + + try { + // get process id from pid file if available + // else if shell is still active, get it from the shell + String processId = null; + if (pidFilePath != null) { + processId = getContainerPid(pidFilePath); + } + + if (processId != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending signal to pid " + processId + + " as user " + user + + " for container " + containerIdStr); + } + + boolean result = exec.signalContainer( + new ContainerSignalContext.Builder() + .setContainer(container) + .setUser(user) + .setPid(processId) + .setSignal(signal) + .build()); + + String diagnostics = "Sent signal " + command + + " (" + signal + ") to pid " + processId + + " as user " + user + + " for container " + containerIdStr + + ", result=" + (result ? "success" : "failed"); + LOG.info(diagnostics); + + dispatcher.getEventHandler().handle( + new ContainerDiagnosticsUpdateEvent(containerId, diagnostics)); + } + } catch (Exception e) { + String message = + "Exception when sending signal to container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.warn(message); + } + } + + @VisibleForTesting + public static Signal translateCommandToSignal( + SignalContainerCommand command) { + Signal signal = Signal.NULL; + switch (command) { + case OUTPUT_THREAD_DUMP: + // TODO for windows support. + signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT; + break; + case GRACEFUL_SHUTDOWN: + signal = Signal.TERM; + break; + case FORCEFUL_SHUTDOWN: + signal = Signal.KILL; + break; + } + return signal; + } + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 6950aa9..3a2649e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -142,7 +142,23 @@ public class ContainersLauncher extends AbstractService + ". Ignoring."); } break; + case SIGNAL_CONTAINER: + SignalContainersLauncherEvent signalEvent = + (SignalContainersLauncherEvent) event; + ContainerLaunch runningContainer = running.get(containerId); + if (runningContainer == null) { + // Container not launched. So nothing needs to be done. + LOG.info("Container " + containerId + " not running, nothing to signal."); + return; + } + + try { + runningContainer.signalContainer(signalEvent.getCommand()); + } catch (IOException e) { + LOG.warn("Got exception while signaling container " + containerId + + " with command " + signalEvent.getCommand()); + } + break; } } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 385b5b2..a88564d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -22,4 +22,5 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. + SIGNAL_CONTAINER, } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java new file mode 100644 index 0000000..de544f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/SignalContainersLauncherEvent.java @@ -0,0 +1,38 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.hadoop.yarn.api.records.SignalContainerCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +// This event can be triggered by one of the following flows +// WebUI -> Container +// CLI -> RM -> NM +public class SignalContainersLauncherEvent extends ContainersLauncherEvent{ + + private final SignalContainerCommand command; + public SignalContainersLauncherEvent(Container container, + SignalContainerCommand command) { + super(container, ContainersLauncherEventType.SIGNAL_CONTAINER); + this.command = command; + } + public SignalContainerCommand getCommand() { + return command; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8f08532b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index 75bcdae..3e00885 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -222,6 +222,42 @@ public class TestContainerManagerWithLCE extends TestContainerManager { super.testChangeContainerResource(); } + @Override + public void testOutputThreadDumpSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testOutputThreadDumpSignal"); + super.testOutputThreadDumpSignal(); + } + + @Override + public void testGracefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testGracefulShutdownSignal"); + super.testGracefulShutdownSignal(); + } + + @Override + public void testForcefulShutdownSignal() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testForcefulShutdownSignal"); + super.testForcefulShutdownSignal(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
