Reaped Docker executor only when it can be connected.

Review: https://reviews.apache.org/r/65382


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cb523f7b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cb523f7b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cb523f7b

Branch: refs/heads/1.3.x
Commit: cb523f7b398cf3ef899301c4ab12e524e37f4fc3
Parents: 00e17eb
Author: Qian Zhang <zhq527...@gmail.com>
Authored: Sun Jan 28 20:57:23 2018 +0800
Committer: Qian Zhang <zhq527...@gmail.com>
Committed: Mon Feb 5 08:37:00 2018 +0800

----------------------------------------------------------------------
 src/slave/containerizer/docker.cpp | 50 ++++++++++++++++++++++++++++-----
 1 file changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cb523f7b/src/slave/containerizer/docker.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/docker.cpp 
b/src/slave/containerizer/docker.cpp
index c300584..d839124 100644
--- a/src/slave/containerizer/docker.cpp
+++ b/src/slave/containerizer/docker.cpp
@@ -28,6 +28,7 @@
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/io.hpp>
+#include <process/network.hpp>
 #include <process/owned.hpp>
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
@@ -982,12 +983,14 @@ Future<Nothing> DockerContainerizerProcess::_recover(
         CHECK_SOME(run.get().id);
         CHECK_EQ(containerId, run.get().id.get());
 
-        // We need the pid so the reaper can monitor the executor so
-        // skip this executor if it's not present. This is not an
-        // error because the slave will try to wait on the container
-        // which will return a failed 'ContainerTermination' and
-        // everything will get cleaned up.
-        if (!run.get().forkedPid.isSome()) {
+        // We need the pid so the reaper can monitor the executor so skip this
+        // executor if it's not present. We will also skip this executor if the
+        // libprocess pid is not present which means the slave exited before
+        // checkpointing it, in which case the executor will shutdown itself
+        // immediately. Both of these two cases are safe to skip because the
+        // slave will try to wait on the container which will return `None()`
+        // and everything will get cleaned up.
+        if (run.get().forkedPid.isNone() || run.get().libprocessPid.isNone()) {
           continue;
         }
 
@@ -1030,9 +1033,42 @@ Future<Nothing> DockerContainerizerProcess::_recover(
         container->launchesExecutorContainer =
           executorContainers.contains(containerId);
 
+        // Only reap the executor process if the executor can be connected
+        // otherwise just set `container->status` to `None()`. This is to
+        // avoid reaping an irrelevant process, e.g., after the agent host is
+        // rebooted, the executor pid happens to be reused by another process.
+        // See MESOS-8125 for details.
+        // Note that if both the pid and the port of the executor are reused
+        // by another process or two processes respectively after the agent
+        // host reboots we will still reap an irrelevant process, but that
+        // should be highly unlikely.
         pid_t pid = run.get().forkedPid.get();
 
-        container->status.set(process::reap(pid));
+        // Create a TCP socket.
+        int_fd socket = ::socket(AF_INET, SOCK_STREAM, 0);
+        if (socket < 0) {
+          return Failure(
+              "Failed to create socket for connecting to executor '" +
+              stringify(executor.id) + "': " + os::strerror(errno));
+        }
+
+        Try<Nothing, SocketError> connect = process::network::connect(
+            socket,
+            run.get().libprocessPid->address);
+
+        if (connect.isSome()) {
+          container->status.set(process::reap(pid));
+        } else {
+          LOG(WARNING) << "Failed to connect to executor '" << executor.id
+                       << "' of framework " << framework.id << ": "
+                       << connect.error().message;
+
+          container->status.set(Future<Option<int>>(None()));
+        }
+
+        // Shutdown and close the socket.
+        shutdown(socket, SHUT_RDWR);
+        close(socket);
 
         container->status.future().get()
           .onAny(defer(self(), &Self::reaped, containerId));

Reply via email to