YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8752f537 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8752f537 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8752f537 Branch: refs/heads/YARN-5972 Commit: 8752f5376d47c715780cc49a0f7bb18a1050e472 Parents: 80b8023 Author: Arun Suresh <[email protected]> Authored: Fri Dec 9 07:51:03 2016 -0800 Committer: Arun Suresh <[email protected]> Committed: Fri Dec 9 07:51:03 2016 -0800 ---------------------------------------------------------------------- .../hadoop/yarn/api/records/ContainerState.java | 7 +- .../src/main/proto/yarn_protos.proto | 1 + .../server/nodemanager/ContainerExecutor.java | 22 +++ .../container/ContainerEventType.java | 6 +- .../container/ContainerImpl.java | 170 ++++++++++++++++++- .../container/ContainerPauseEvent.java | 40 +++++ .../container/ContainerResumeEvent.java | 39 +++++ .../container/ContainerState.java | 3 +- .../launcher/ContainerLaunch.java | 90 +++++++++- .../launcher/ContainersLauncher.java | 32 ++++ .../launcher/ContainersLauncherEventType.java | 3 + .../scheduler/ContainerSchedulerEventType.java | 1 + .../container/TestContainer.java | 51 ++++++ 13 files changed, 454 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 4efd8c1..2d83cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -32,10 +32,13 @@ public enum ContainerState { /** Running container */ RUNNING, - + /** Completed container */ COMPLETE, /** Scheduled (awaiting resources) at the NM. */ - SCHEDULED + SCHEDULED, + + /** Paused at the NM. */ + PAUSED } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 5a70298..9cd6348 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -83,6 +83,7 @@ enum ContainerStateProto { C_RUNNING = 2; C_COMPLETE = 3; C_SCHEDULED = 4; + C_PAUSED = 5; } message ContainerProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index f880506..ffa125a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -684,6 +684,28 @@ public abstract class ContainerExecutor implements Configurable { } /** + * Pause the container. The default implementation is to raise a kill event. + * Specific executor implementations can override this behavior. + * @param container + * the Container + */ + public void pauseContainer(Container container) { + LOG.warn(container.getContainerId() + " doesn't support pausing."); + throw new UnsupportedOperationException(); + } + + /** + * Resume the container from pause state. The default implementation ignores + * this event. Specific implementations can override this behavior. + * @param container + * the Container + */ + public void resumeContainer(Container container) { + LOG.warn(container.getContainerId() + " doesn't support resume."); + throw new UnsupportedOperationException(); + } + + /** * Get the process-identifier for the container. * * @param containerID the container ID http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index afea0e6..1475435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -27,6 +27,8 @@ public enum ContainerEventType { CONTAINER_DONE, REINITIALIZE_CONTAINER, ROLLBACK_REINIT, + PAUSE_CONTAINER, + RESUME_CONTAINER, // DownloadManager CONTAINER_INITED, @@ -38,5 +40,7 @@ public enum ContainerEventType { CONTAINER_LAUNCHED, CONTAINER_EXITED_WITH_SUCCESS, CONTAINER_EXITED_WITH_FAILURE, - CONTAINER_KILLED_ON_REQUEST + CONTAINER_KILLED_ON_REQUEST, + CONTAINER_PAUSED, + CONTAINER_RESUMED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 4a6be32..ec7ee49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -298,6 +298,8 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) + .addTransition(ContainerState.NEW, ContainerState.DONE, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, @@ -313,6 +315,8 @@ public class ContainerImpl implements Container { .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillBeforeRunningTransition()) + .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -326,7 +330,8 @@ public class ContainerImpl implements Container { // container not launched so kill is a no-op .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // container cleanup triggers a release of all resources // regardless of whether they were localized or not // LocalizedResource handles release event in all states @@ -382,6 +387,76 @@ public class ContainerImpl implements Container { ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.PAUSING, + ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) + + // From PAUSING State + .addTransition(ContainerState.PAUSING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSED, + ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSING state + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS) + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + + // From PAUSED State + .addTransition(ContainerState.PAUSED, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.PAUSE_CONTAINER) + .addTransition(ContainerState.PAUSED, ContainerState.RESUMING, + ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSED state + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + + // From RESUMING State + .addTransition(ContainerState.RESUMING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RESUMING, ContainerState.RUNNING, + ContainerEventType.CONTAINER_RESUMED) + .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + // In case something goes wrong then container will exit from the + // RESUMING state + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) // From REINITIALIZING State .addTransition(ContainerState.REINITIALIZING, @@ -405,6 +480,8 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) .addTransition(ContainerState.REINITIALIZING, ContainerState.SCHEDULED, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, @@ -422,6 +499,8 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, @@ -433,7 +512,8 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From EXITED_WITH_FAILURE State .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, @@ -445,7 +525,8 @@ public class ContainerImpl implements Container { UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -479,7 +560,8 @@ public class ContainerImpl implements Container { // in the container launcher .addTransition(ContainerState.KILLING, ContainerState.KILLING, - ContainerEventType.CONTAINER_LAUNCHED) + EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.PAUSE_CONTAINER)) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, @@ -495,11 +577,13 @@ public class ContainerImpl implements Container { EnumSet.of(ContainerEventType.KILL_CONTAINER, ContainerEventType.RESOURCE_FAILED, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.PAUSE_CONTAINER)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.INIT_CONTAINER) .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -525,6 +609,8 @@ public class ContainerImpl implements Container { case LOCALIZING: case LOCALIZATION_FAILED: case SCHEDULED: + case PAUSED: + case RESUMING: return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED; case RUNNING: case RELAUNCHING: @@ -534,6 +620,7 @@ public class ContainerImpl implements Container { case KILLING: case CONTAINER_CLEANEDUP_AFTER_KILL: case CONTAINER_RESOURCES_CLEANINGUP: + case PAUSING: return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; case DONE: default: @@ -1473,6 +1560,26 @@ public class ContainerImpl implements Container { } /** + * Transitions upon receiving PAUSE_CONTAINER. + * - LOCALIZED -> KILLING. + * - REINITIALIZING -> KILLING. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class KillOnPauseTransition implements + SingleArcTransition<ContainerImpl, ContainerEvent> { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Kill the process/process-grp + container.setIsReInitializing(false); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } + + /** * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL * upon receiving CONTAINER_KILLED_ON_REQUEST. */ @@ -1661,6 +1768,57 @@ public class ContainerImpl implements Container { } } + /** + * Transitions upon receiving PAUSE_CONTAINER. + * - RUNNING -> PAUSED + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PauseContainerTransition implements + SingleArcTransition<ContainerImpl, ContainerEvent> { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.PAUSE_CONTAINER)); + ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event; + container.addDiagnostics(pauseEvent.getDiagnostic(), "\n"); + } + } + + /** + * Transitions upon receiving PAUSED_CONTAINER. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PausedContainerTransition implements + SingleArcTransition<ContainerImpl, ContainerEvent> { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Container was PAUSED so tell the scheduler + container.dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(container, + ContainerSchedulerEventType.CONTAINER_PAUSED)); + } + } + + /** + * Transitions upon receiving RESUME_CONTAINER. + * - PAUSED -> RUNNING + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class ResumeContainerTransition implements + SingleArcTransition<ContainerImpl, ContainerEvent> { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.RESUME_CONTAINER)); + ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event; + container.addDiagnostics(resumeEvent.getDiagnostic(), "\n"); + } + } + @Override public void handle(ContainerEvent event) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java new file mode 100644 index 0000000..898304e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.PAUSE_CONTAINER. + */ +public class ContainerPauseEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerPauseEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.PAUSE_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java new file mode 100644 index 0000000..d7c9e9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.RESUME_CONTAINER. + */ +public class ContainerResumeEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerResumeEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.RESUME_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 91d1356..7c3fea8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, - CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE + CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE, + PAUSING, PAUSED, RESUMING } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 823457f..2516e11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.ConverterUtils; public class ContainerLaunch implements Callable<Integer> { @@ -103,8 +105,10 @@ public class ContainerLaunch implements Callable<Integer> { private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - + protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false); + protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private volatile boolean killedBeforeStart = false; @@ -746,6 +750,90 @@ public class ContainerLaunch implements Callable<Integer> { } /** + * Pause the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of errors. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void pauseContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Pausing the container " + containerIdStr); + + // The pause event is only handled if the container is in the running state + // (the container state machine), so we don't check for + // shouldLaunchContainer over here + + if (!shouldPauseContainer.compareAndSet(false, true)) { + LOG.info("Container " + containerId + " not paused as " + + "resume already called"); + return; + } + + try { + // Pause the container + exec.pauseContainer(container); + + // PauseContainer is a blocking call. We are here almost means the + // container is paused, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_PAUSED)); + } catch (Exception e) { + String message = + "Exception when trying to pause container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + + /** + * Resume the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of error. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void resumeContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Resuming the container " + containerIdStr); + + // The resume event is only handled if the container is in a paused state + // so we don't check for the launched flag here. + + // paused flag will be set to true if process already paused + boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true); + if (!alreadyPaused) { + LOG.info("Container " + containerIdStr + " not paused." + + " No resume necessary"); + return; + } + + // If the container has already started + try { + exec.resumeContainer(container); + // ResumeContainer is a blocking call. We are here almost means the + // container is resumed, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_RESUMED)); + } catch (Exception e) { + String message = + "Exception when trying to resume container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index d4a7bfd..eb6eaf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -170,6 +172,36 @@ public class ContainersLauncher extends AbstractService + " with command " + signalEvent.getCommand()); } break; + case PAUSE_CONTAINER: + ContainerLaunch launchedContainer = running.get(containerId); + if (launchedContainer == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Pause the container + try { + launchedContainer.pauseContainer(); + } catch (Exception e) { + LOG.info("Got exception while pausing container: " + + StringUtils.stringifyException(e)); + } + break; + case RESUME_CONTAINER: + ContainerLaunch launchCont = running.get(containerId); + if (launchCont == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Resume the container. + try { + launchCont.resumeContainer(); + } catch (Exception e) { + LOG.info("Got exception while resuming container: " + + StringUtils.stringifyException(e)); + } + break; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 380a032..1054e06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -25,4 +25,7 @@ public enum ContainersLauncherEventType { CLEANUP_CONTAINER, // The process(grp) itself. CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself. SIGNAL_CONTAINER, + PAUSE_CONTAINER, + RESUME_CONTAINER + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 086cb9b..9ff731f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -26,4 +26,5 @@ public enum ContainerSchedulerEventType { CONTAINER_COMPLETED, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, + CONTAINER_PAUSED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8752f537/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 33f4609..8909088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; public class TestContainer { @@ -207,6 +208,42 @@ public class TestContainer { @Test @SuppressWarnings("unchecked") // mocked generic + public void testContainerPauseAndResume() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + int running = metrics.getRunningContainers(); + wc.launchContainer(); + assertEquals(running + 1, metrics.getRunningContainers()); + reset(wc.localizerBus); + wc.pauseContainer(); + assertEquals(ContainerState.PAUSED, + wc.c.getContainerState()); + wc.resumeContainer(); + assertEquals(ContainerState.RUNNING, + wc.c.getContainerState()); + wc.containerKilledOnRequest(); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + assertNull(wc.c.getLocalizedResources()); + verifyCleanupCall(wc); + int failed = metrics.getFailedContainers(); + wc.containerResourcesCleanup(); + assertEquals(ContainerState.DONE, wc.c.getContainerState()); + assertEquals(failed + 1, metrics.getFailedContainers()); + assertEquals(running, metrics.getRunningContainers()); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test + @SuppressWarnings("unchecked") // mocked generic public void testCleanupOnFailure() throws Exception { WrappedContainer wc = null; try { @@ -955,6 +992,8 @@ public class TestContainer { NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater); ContainerExecutor executor = mock(ContainerExecutor.class); + Mockito.doNothing().when(executor).pauseContainer(any(Container.class)); + Mockito.doNothing().when(executor).resumeContainer(any(Container.class)); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); // create a mock ExecutorService, which will not really launch @@ -1143,6 +1182,18 @@ public class TestContainer { drainDispatcherEvents(); } + public void pauseContainer() { + c.handle(new ContainerPauseEvent(cId, + "PauseRequest")); + drainDispatcherEvents(); + } + + public void resumeContainer() { + c.handle(new ContainerResumeEvent(cId, + "ResumeRequest")); + drainDispatcherEvents(); + } + public void containerKilledOnRequest() { int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER; String diagnosticMsg = "Container completed with exit code " + exitCode; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
