YARN-7644. NM gets backed up deleting docker containers. Contributed by Chandni 
Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ce70e12
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ce70e12
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ce70e12

Branch: refs/heads/HDFS-12943
Commit: 5ce70e1211e624d58e8bb1181aec00729ebdc085
Parents: cd28051
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Oct 10 09:52:19 2018 -0500
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Oct 10 09:52:19 2018 -0500

----------------------------------------------------------------------
 .../launcher/ContainerCleanup.java              | 229 +++++++++++++++++++
 .../launcher/ContainerLaunch.java               | 226 +++++-------------
 .../launcher/ContainersLauncher.java            |  14 +-
 .../launcher/TestContainerCleanup.java          | 108 +++++++++
 .../launcher/TestContainersLauncher.java        |  12 +-
 5 files changed, 401 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ce70e12/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java
new file mode 100644
index 0000000..963d28b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerCleanup.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
+import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.EXIT_CODE_FILE_SUFFIX;
+
+/**
+ * Cleanup the container.
+ * Cancels the launch if launch has not started yet or signals
+ * the executor to not execute the process if not already done so.
+ * Also, sends a SIGTERM followed by a SIGKILL to the process if
+ * the process id is available.
+ */
+public class ContainerCleanup implements Runnable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerCleanup.class);
+
+  private final Context context;
+  private final Configuration conf;
+  private final Dispatcher dispatcher;
+  private final ContainerExecutor exec;
+  private final Container container;
+  private final ContainerLaunch launch;
+  private final long sleepDelayBeforeSigKill;
+
+
+  public ContainerCleanup(Context context, Configuration configuration,
+      Dispatcher dispatcher, ContainerExecutor exec,
+      Container container,
+      ContainerLaunch containerLaunch) {
+
+    this.context = Preconditions.checkNotNull(context, "context");
+    this.conf = Preconditions.checkNotNull(configuration, "config");
+    this.dispatcher = Preconditions.checkNotNull(dispatcher, "dispatcher");
+    this.exec = Preconditions.checkNotNull(exec, "exec");
+    this.container = Preconditions.checkNotNull(container, "container");
+    this.launch = Preconditions.checkNotNull(containerLaunch, "launch");
+    this.sleepDelayBeforeSigKill = conf.getLong(
+        YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+        YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
+  }
+
+  @Override
+  public void run() {
+    ContainerId containerId = container.getContainerId();
+    String containerIdStr = containerId.toString();
+    LOG.info("Cleaning up container " + containerIdStr);
+
+    try {
+      context.getNMStateStore().storeContainerKilled(containerId);
+    } catch (IOException e) {
+      LOG.error("Unable to mark container " + containerId
+          + " killed in store", e);
+    }
+
+    // launch flag will be set to true if process already launched
+    boolean alreadyLaunched = !launch.markLaunched();
+    if (!alreadyLaunched) {
+      LOG.info("Container " + containerIdStr + " not launched."
+          + " No cleanup needed to be done");
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Marking container " + containerIdStr + " as inactive");
+    }
+    // this should ensure that if the container process has not launched
+    // by this time, it will never be launched
+    exec.deactivateContainer(containerId);
+    Path pidFilePath = launch.getPidFilePath();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Getting pid for container {} to kill"
+              + " from pid file {}", containerIdStr, pidFilePath != null ?
+          pidFilePath : "null");
+    }
+
+    // however the container process may have already started
+    try {
+
+      // get process id from pid file if available
+      // else if shell is still active, get it from the shell
+      String processId = launch.getContainerPid();
+
+      // kill process
+      String user = container.getUser();
+      if (processId != null) {
+        signalProcess(processId, user, containerIdStr);
+      } else {
+        // Normally this means that the process was notified about
+        // deactivateContainer above and did not start.
+        // Since we already set the state to RUNNING or REINITIALIZING
+        // we have to send a killed event to continue.
+        if (!launch.isLaunchCompleted()) {
+          LOG.warn("Container clean up before pid file created "
+              + containerIdStr);
+          dispatcher.getEventHandler().handle(
+              new ContainerExitEvent(container.getContainerId(),
+                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+                  Shell.WINDOWS ?
+                      ContainerExecutor.ExitCode.FORCE_KILLED.getExitCode() :
+                      ContainerExecutor.ExitCode.TERMINATED.getExitCode(),
+                  "Container terminated before pid file created."));
+          // There is a possibility that the launch grabbed the file name 
before
+          // the deactivateContainer above but it was slow enough to avoid
+          // getContainerPid.
+          // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
+          // reduces the likelihood of this race condition and process leak.
+        }
+        // The Docker container may not have fully started, reap the container.
+        if (DockerLinuxContainerRuntime.isDockerContainerRequested(conf,
+            container.getLaunchContext().getEnvironment())) {
+          reapDockerContainerNoPid(user);
+        }
+      }
+    } catch (Exception e) {
+      String message =
+          "Exception when trying to cleanup container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.warn(message);
+      dispatcher.getEventHandler().handle(
+          new ContainerDiagnosticsUpdateEvent(containerId, message));
+    } finally {
+      // cleanup pid file if present
+      if (pidFilePath != null) {
+        try {
+          FileContext lfs = FileContext.getLocalFSFileContext();
+          lfs.delete(pidFilePath, false);
+          lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
+        } catch (IOException ioe) {
+          LOG.warn("{} exception trying to delete pid file {}. Ignoring.",
+              containerId, pidFilePath, ioe);
+        }
+      }
+    }
+
+    try {
+      // Reap the container
+      launch.reapContainer();
+    } catch (IOException ioe) {
+      LOG.warn("{} exception trying to reap container. Ignoring.", containerId,
+          ioe);
+    }
+  }
+
+  private void signalProcess(String processId, String user,
+      String containerIdStr) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sending signal to pid " + processId + " as user " + user
+          + " for container " + containerIdStr);
+    }
+    final ContainerExecutor.Signal signal =
+        sleepDelayBeforeSigKill > 0 ? ContainerExecutor.Signal.TERM :
+            ContainerExecutor.Signal.KILL;
+
+    boolean result = sendSignal(user, processId, signal);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
+          + user + " for container " + containerIdStr + ", result="
+          + (result ? "success" : "failed"));
+    }
+    if (sleepDelayBeforeSigKill > 0) {
+      new ContainerExecutor.DelayedProcessKiller(container, user, processId,
+          sleepDelayBeforeSigKill, ContainerExecutor.Signal.KILL, 
exec).start();
+    }
+  }
+
+  private boolean sendSignal(String user, String processId,
+      ContainerExecutor.Signal signal)
+      throws IOException {
+    return exec.signalContainer(
+        new ContainerSignalContext.Builder().setContainer(container)
+            .setUser(user).setPid(processId).setSignal(signal).build());
+  }
+
+  private void reapDockerContainerNoPid(String user) throws IOException {
+    String containerIdStr =
+        container.getContainerTokenIdentifier().getContainerID().toString();
+    LOG.info("Unable to obtain pid, but docker container request detected. "
+        + "Attempting to reap container " + containerIdStr);
+    boolean result = exec.reapContainer(
+        new ContainerReapContext.Builder()
+            .setContainer(container)
+            .setUser(container.getUser())
+            .build());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Sent signal to docker container " + containerIdStr
+          + " as user " + user + ", result=" + (result ? "success" : 
"failed"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ce70e12/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 3fa73ec..6776836 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -70,7 +70,6 @@ import 
org.apache.hadoop.yarn.exceptions.ConfigurationException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -85,7 +84,6 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
@@ -115,7 +113,7 @@ public class ContainerLaunch implements Callable<Integer> {
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
   private static final String PID_FILE_NAME_FMT = "%s.pid";
-  private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
+  static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
 
   protected final Dispatcher dispatcher;
   protected final ContainerExecutor exec;
@@ -131,7 +129,6 @@ public class ContainerLaunch implements Callable<Integer> {
   protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private volatile boolean killedBeforeStart = false;
-  private long sleepDelayBeforeSigKill = 250;
   private long maxKillWaitTime = 2000;
 
   protected Path pidFilePath = null;
@@ -152,9 +149,6 @@ public class ContainerLaunch implements Callable<Integer> {
     this.dispatcher = dispatcher;
     this.dirsHandler = dirsHandler;
     this.containerManager = containerManager;
-    this.sleepDelayBeforeSigKill =
-        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
-            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
     this.maxKillWaitTime =
         conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
             YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
@@ -515,6 +509,25 @@ public class ContainerLaunch implements Callable<Integer> {
     return launchPrep;
   }
 
+  void reapContainer() throws IOException {
+    containerExecLock.lock();
+    try {
+      // Reap the container
+      boolean result = exec.reapContainer(
+          new ContainerReapContext.Builder()
+              .setContainer(container)
+              .setUser(container.getUser())
+              .build());
+      if (!result) {
+        throw new IOException("Reap container failed for container " +
+            container.getContainerId());
+      }
+      cleanupContainerFiles(getContainerWorkDir());
+    } finally {
+      containerExecLock.unlock();
+    }
+  }
+
   protected int prepareForLaunch(ContainerStartContext ctx) throws IOException 
{
     ContainerId containerId = container.getContainerId();
     if (container.isMarkedForKilling()) {
@@ -721,121 +734,6 @@ public class ContainerLaunch implements Callable<Integer> 
{
     return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
         + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
   }
-  
-  /**
-   * Cleanup the container.
-   * Cancels the launch if launch has not started yet or signals
-   * the executor to not execute the process if not already done so.
-   * Also, sends a SIGTERM followed by a SIGKILL to the process if
-   * the process id is available.
-   * @throws IOException
-   */
-  public void cleanupContainer() throws IOException {
-    ContainerId containerId = container.getContainerId();
-    String containerIdStr = containerId.toString();
-    LOG.info("Cleaning up container " + containerIdStr);
-
-    try {
-      context.getNMStateStore().storeContainerKilled(containerId);
-    } catch (IOException e) {
-      LOG.error("Unable to mark container " + containerId
-          + " killed in store", e);
-    }
-
-    // launch flag will be set to true if process already launched
-    boolean alreadyLaunched =
-        !containerAlreadyLaunched.compareAndSet(false, true);
-    if (!alreadyLaunched) {
-      LOG.info("Container " + containerIdStr + " not launched."
-          + " No cleanup needed to be done");
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Marking container " + containerIdStr + " as inactive");
-    }
-    // this should ensure that if the container process has not launched 
-    // by this time, it will never be launched
-    exec.deactivateContainer(containerId);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Getting pid for container " + containerIdStr + " to kill"
-          + " from pid file " 
-          + (pidFilePath != null ? pidFilePath.toString() : "null"));
-    }
-    
-    // however the container process may have already started
-    try {
-
-      // get process id from pid file if available
-      // else if shell is still active, get it from the shell
-      String processId = null;
-      if (pidFilePath != null) {
-        processId = getContainerPid(pidFilePath);
-      }
-
-      // kill process
-      String user = container.getUser();
-      if (processId != null) {
-        signalProcess(processId, user, containerIdStr);
-      } else {
-        // Normally this means that the process was notified about
-        // deactivateContainer above and did not start.
-        // Since we already set the state to RUNNING or REINITIALIZING
-        // we have to send a killed event to continue.
-        if (!completed.get()) {
-          LOG.warn("Container clean up before pid file created "
-              + containerIdStr);
-          dispatcher.getEventHandler().handle(
-              new ContainerExitEvent(container.getContainerId(),
-                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
-                  Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
-                      ExitCode.TERMINATED.getExitCode(),
-                  "Container terminated before pid file created."));
-          // There is a possibility that the launch grabbed the file name 
before
-          // the deactivateContainer above but it was slow enough to avoid
-          // getContainerPid.
-          // Increasing YarnConfiguration.NM_PROCESS_KILL_WAIT_MS
-          // reduces the likelihood of this race condition and process leak.
-        }
-        // The Docker container may not have fully started, reap the container.
-        if (DockerLinuxContainerRuntime.isDockerContainerRequested(
-            conf,
-            container.getLaunchContext().getEnvironment())) {
-          reapDockerContainerNoPid(user);
-        }
-      }
-    } catch (Exception e) {
-      String message =
-          "Exception when trying to cleanup container " + containerIdStr
-              + ": " + StringUtils.stringifyException(e);
-      LOG.warn(message);
-      dispatcher.getEventHandler().handle(
-        new ContainerDiagnosticsUpdateEvent(containerId, message));
-    } finally {
-      // cleanup pid file if present
-      if (pidFilePath != null) {
-        FileContext lfs = FileContext.getLocalFSFileContext();
-        lfs.delete(pidFilePath, false);
-        lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
-      }
-    }
-    containerExecLock.lock();
-    try {
-      // Reap the container
-      boolean result = exec.reapContainer(
-          new ContainerReapContext.Builder()
-              .setContainer(container)
-              .setUser(container.getUser())
-              .build());
-      if (!result) {
-        throw new IOException("Reap container failed for container "
-            + containerIdStr);
-      }
-      cleanupContainerFiles(getContainerWorkDir());
-    } finally {
-      containerExecLock.unlock();
-    }
-  }
 
   /**
    * Send a signal to the container.
@@ -874,11 +772,7 @@ public class ContainerLaunch implements Callable<Integer> {
     try {
       // get process id from pid file if available
       // else if shell is still active, get it from the shell
-      String processId = null;
-      if (pidFilePath != null) {
-        processId = getContainerPid(pidFilePath);
-      }
-
+      String processId = getContainerPid();
       if (processId != null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Sending signal to pid " + processId
@@ -912,50 +806,6 @@ public class ContainerLaunch implements Callable<Integer> {
     }
   }
 
-  private boolean sendSignal(String user, String processId, Signal signal)
-      throws IOException {
-    return exec.signalContainer(
-        new ContainerSignalContext.Builder().setContainer(container)
-            .setUser(user).setPid(processId).setSignal(signal).build());
-  }
-
-  private void signalProcess(String processId, String user,
-      String containerIdStr) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Sending signal to pid " + processId + " as user " + user
-          + " for container " + containerIdStr);
-    }
-    final Signal signal =
-        sleepDelayBeforeSigKill > 0 ? Signal.TERM : Signal.KILL;
-
-    boolean result = sendSignal(user, processId, signal);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Sent signal " + signal + " to pid " + processId + " as user "
-          + user + " for container " + containerIdStr + ", result="
-          + (result ? "success" : "failed"));
-    }
-    if (sleepDelayBeforeSigKill > 0) {
-      new DelayedProcessKiller(container, user, processId,
-          sleepDelayBeforeSigKill, Signal.KILL, exec).start();
-    }
-  }
-
-  private void reapDockerContainerNoPid(String user) throws IOException {
-    String containerIdStr =
-        container.getContainerTokenIdentifier().getContainerID().toString();
-    LOG.info("Unable to obtain pid, but docker container request detected. "
-            + "Attempting to reap container " + containerIdStr);
-    boolean result = exec.reapContainer(
-        new ContainerReapContext.Builder()
-            .setContainer(container)
-            .setUser(container.getUser())
-            .build());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Sent signal to docker container " + containerIdStr
-          + " as user " + user + ", result=" + (result ? "success" : 
"failed"));
-    }
-  }
-
   @VisibleForTesting
   public static Signal translateCommandToSignal(
       SignalContainerCommand command) {
@@ -1076,14 +926,16 @@ public class ContainerLaunch implements 
Callable<Integer> {
   /**
    * Loop through for a time-bounded interval waiting to
    * read the process id from a file generated by a running process.
-   * @param pidFilePath File from which to read the process id
-   * @return Process ID
+   * @return Process ID; null when pidFilePath is null
    * @throws Exception
    */
-  private String getContainerPid(Path pidFilePath) throws Exception {
+  String getContainerPid() throws Exception {
+    if (pidFilePath == null) {
+      return null;
+    }
     String containerIdStr = 
         container.getContainerId().toString();
-    String processId = null;
+    String processId;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Accessing pid for container " + containerIdStr
           + " from pid file " + pidFilePath);
@@ -1889,4 +1741,28 @@ public class ContainerLaunch implements 
Callable<Integer> {
       LOG.warn("Failed to delete " + path, e);
     }
   }
+
+  /**
+   * Returns the PID File Path.
+   */
+  Path getPidFilePath() {
+    return pidFilePath;
+  }
+
+  /**
+   * Marks the container to be launched only if it was not launched.
+   *
+   * @return true if successful; false otherwise.
+   */
+  boolean markLaunched() {
+    return containerAlreadyLaunched.compareAndSet(false, true);
+  }
+
+  /**
+   * Returns if the launch is completed or not.
+   */
+  boolean isLaunchCompleted() {
+    return completed.get();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ce70e12/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 7870f86..fdfe5b1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.slf4j.Logger;
@@ -154,8 +153,8 @@ public class ContainersLauncher extends AbstractService
         break;
       case CLEANUP_CONTAINER:
       case CLEANUP_CONTAINER_FOR_REINIT:
-        ContainerLaunch launcher = running.remove(containerId);
-        if (launcher == null) {
+        ContainerLaunch existingLaunch = running.remove(containerId);
+        if (existingLaunch == null) {
           // Container not launched.
           // triggering KILLING to CONTAINER_CLEANEDUP_AFTER_KILL transition.
           dispatcher.getEventHandler().handle(
@@ -169,12 +168,9 @@ public class ContainersLauncher extends AbstractService
 
         // Cleanup a container whether it is running/killed/completed, so that
         // no sub-processes are alive.
-        try {
-          launcher.cleanupContainer();
-        } catch (IOException e) {
-          LOG.warn("Got exception while cleaning container " + containerId
-              + ". Ignoring.");
-        }
+        ContainerCleanup cleanup = new ContainerCleanup(context, getConfig(),
+            dispatcher, exec, event.getContainer(), existingLaunch);
+        containerLauncher.submit(cleanup);
         break;
       case SIGNAL_CONTAINER:
         SignalContainersLauncherEvent signalEvent =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ce70e12/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java
new file mode 100644
index 0000000..6c99379
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerCleanup.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link ContainerCleanup}.
+ */
+public class TestContainerCleanup {
+
+  private YarnConfiguration conf;
+  private ContainerId containerId;
+  private ContainerExecutor executor;
+  private ContainerLaunch launch;
+  private ContainerCleanup cleanup;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setLong(NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 60000);
+    Context context = mock(Context.class);
+    NMStateStoreService storeService = mock(NMStateStoreService.class);
+    when(context.getNMStateStore()).thenReturn(storeService);
+
+    Dispatcher dispatcher = new InlineDispatcher();
+    executor = mock(ContainerExecutor.class);
+    when(executor.signalContainer(Mockito.any(
+        ContainerSignalContext.class))).thenReturn(true);
+
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
+        1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 
1);
+    containerId = ContainerId.newContainerId(attemptId, 1);
+    Container container = mock(Container.class);
+
+    when(container.getContainerId()).thenReturn(containerId);
+
+    launch = mock(ContainerLaunch.class);
+    launch.containerAlreadyLaunched = new AtomicBoolean(false);
+
+    launch.pidFilePath = new Path("target/" + containerId.toString() + ".pid");
+    when(launch.getContainerPid()).thenReturn(containerId.toString());
+
+    cleanup = new ContainerCleanup(context, conf, dispatcher, executor,
+        container, launch);
+  }
+
+  @Test
+  public void testNoCleanupWhenContainerNotLaunched() throws IOException {
+    cleanup.run();
+    verify(launch, Mockito.times(0)).signalContainer(
+        Mockito.any(SignalContainerCommand.class));
+  }
+
+  @Test
+  public void testCleanup() throws Exception {
+    launch.containerAlreadyLaunched.set(true);
+    cleanup.run();
+    ArgumentCaptor<ContainerSignalContext> captor =
+        ArgumentCaptor.forClass(ContainerSignalContext.class);
+
+    verify(executor, Mockito.times(1)).signalContainer(captor.capture());
+    Assert.assertEquals("signal", ContainerExecutor.Signal.TERM,
+        captor.getValue().getSignal());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ce70e12/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
index 0234ac2..f2fafd2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainersLauncher.java
@@ -192,12 +192,14 @@ public class TestContainersLauncher {
         .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
     dummyMap.put(containerId, containerLaunch);
     Whitebox.setInternalState(spy, "running", dummyMap);
+
     when(event.getType())
         .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER);
-    doNothing().when(containerLaunch).cleanupContainer();
+    assertEquals(1, dummyMap.size());
     spy.handle(event);
     assertEquals(0, dummyMap.size());
-    Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(ContainerCleanup.class));
   }
 
   @Test
@@ -207,12 +209,14 @@ public class TestContainersLauncher {
         .synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
     dummyMap.put(containerId, containerLaunch);
     Whitebox.setInternalState(spy, "running", dummyMap);
+
     when(event.getType())
         .thenReturn(ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT);
-    doNothing().when(containerLaunch).cleanupContainer();
+    assertEquals(1, dummyMap.size());
     spy.handle(event);
     assertEquals(0, dummyMap.size());
-    Mockito.verify(containerLaunch, Mockito.times(1)).cleanupContainer();
+    Mockito.verify(containerLauncher, Mockito.times(1))
+        .submit(Mockito.any(ContainerCleanup.class));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to