Updated the io switchboard to launch an external io switchboard server.

We don't currently handle recovering access to the io switchboard
server process after agent restarts. We will add that in a subsequent
commit.

Review: https://reviews.apache.org/r/54148


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a13b70ac
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a13b70ac
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a13b70ac

Branch: refs/heads/master
Commit: a13b70ac5f5068a185232e050ab1e5c1bf4946dd
Parents: 4223786
Author: Kevin Klues <klue...@gmail.com>
Authored: Mon Nov 28 21:03:32 2016 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Thu Dec 1 10:11:45 2016 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |   6 +
 .../containerizer/mesos/io/switchboard.cpp      | 257 +++++++++++++++++--
 .../containerizer/mesos/io/switchboard.hpp      |  15 ++
 .../containerizer/mesos/io/switchboard_main.cpp |  63 +++++
 4 files changed, 315 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a13b70ac/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 4166274..9177ea6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1389,6 +1389,12 @@ mesos_logrotate_logger_SOURCES =         \
 mesos_logrotate_logger_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_logrotate_logger_LDADD = libmesos.la $(LDADD)
 
+pkglibexec_PROGRAMS += mesos-io-switchboard
+mesos_io_switchboard_SOURCES = \
+  slave/containerizer/mesos/io/switchboard_main.cpp
+mesos_io_switchboard_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_io_switchboard_LDADD = libmesos.la $(LDADD)
+
 if WITH_NETWORK_ISOLATOR
 pkglibexec_PROGRAMS += mesos-network-helper
 mesos_network_helper_SOURCES = 
slave/containerizer/mesos/isolators/network/helper.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/a13b70ac/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp 
b/src/slave/containerizer/mesos/io/switchboard.cpp
index 5751853..3c031e3 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -14,7 +14,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <map>
 #include <string>
+#include <vector>
 
 #include <process/collect.hpp>
 #include <process/defer.hpp>
@@ -23,10 +25,26 @@
 #include <process/io.hpp>
 #include <process/owned.hpp>
 
+#include <process/process.hpp>
+#include <process/reap.hpp>
+#include <process/subprocess.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+
+#include <mesos/type_utils.hpp>
+
 #include <mesos/agent/agent.hpp>
 
 #include <mesos/slave/container_logger.hpp>
 
+#include "slave/flags.hpp"
+#include "slave/state.hpp"
+
+#include "slave/containerizer/mesos/paths.hpp"
+
 #include "slave/containerizer/mesos/io/switchboard.hpp"
 
 namespace http = process::http;
@@ -43,6 +61,11 @@ using process::Owned;
 using process::PID;
 using process::Process;
 using process::Promise;
+using process::Subprocess;
+
+using std::map;
+using std::string;
+using std::vector;
 
 using mesos::slave::ContainerConfig;
 using mesos::slave::ContainerIO;
@@ -113,45 +136,227 @@ Future<Option<ContainerLaunchInfo>> 
IOSwitchboard::prepare(
     .then(defer(
         PID<IOSwitchboard>(this),
         &IOSwitchboard::_prepare,
+        containerId,
         lambda::_1));
 }
 
 
 Future<Option<ContainerLaunchInfo>> IOSwitchboard::_prepare(
+    const ContainerID& containerId,
     const ContainerLogger::SubprocessInfo& loggerInfo)
 {
-  ContainerLaunchInfo launchInfo;
+  // On windows, we do not yet support running an io switchboard
+  // server, so we must error out if the agent
+  // `io_switchboard_enable_server` flag is enabled.
+#ifdef __WINDOWS__
+  if (flags.io_switchboard_enable_server) {
+      return Failure(
+          "Setting the agent flag"
+          " '--io_switchboard_enable_server=true'"
+          " is not supported on windows");
+  }
+#endif
+
+  if (!flags.io_switchboard_enable_server) {
+    ContainerLaunchInfo launchInfo;
+
+    ContainerIO* out = launchInfo.mutable_out();
+    ContainerIO* err = launchInfo.mutable_err();
+
+    switch (loggerInfo.out.type()) {
+      case ContainerLogger::SubprocessInfo::IO::Type::FD:
+        out->set_type(ContainerIO::FD);
+        out->set_fd(loggerInfo.out.fd().get());
+        break;
+      case ContainerLogger::SubprocessInfo::IO::Type::PATH:
+        out->set_type(ContainerIO::PATH);
+        out->set_path(loggerInfo.out.path().get());
+        break;
+      default:
+        UNREACHABLE();
+    }
+
+    switch (loggerInfo.err.type()) {
+      case ContainerLogger::SubprocessInfo::IO::Type::FD:
+        err->set_type(ContainerIO::FD);
+        err->set_fd(loggerInfo.err.fd().get());
+        break;
+      case ContainerLogger::SubprocessInfo::IO::Type::PATH:
+        err->set_type(ContainerIO::PATH);
+        err->set_path(loggerInfo.err.path().get());
+        break;
+      default:
+        UNREACHABLE();
+    }
+
+    return launchInfo;
+  }
+
+#ifndef __WINDOWS__
+  // First make sure that we haven't already spawned an io
+  // switchboard server for this container.
+  if (infos.contains(containerId)) {
+    return Failure("Already prepared io switchboard server for container"
+                   " '" + stringify(containerId) + "'");
+  }
+
+  // Manually construct pipes instead of using `Subprocess::PIPE`
+  // so that the ownership of the FDs is properly represented. The
+  // `Subprocess` spawned below owns one end of each pipe and will
+  // be solely responsible for closing that end. The ownership of
+  // the other end will be passed to the caller of this function
+  // and eventually passed to the container being launched.
+  int infds[2];
+  int outfds[2];
+  int errfds[2];
+
+  // A list of file decriptors we've opened so far.
+  vector<int> fds = {};
+
+  // Helper for closing the list of file
+  // descriptors we've opened so far.
+  auto close = [](const vector<int>& fds) {
+    foreach (int fd, fds) {
+      os::close(fd);
+    }
+  };
+
+  Try<Nothing> pipe = os::pipe(infds);
+  if (pipe.isError()) {
+    close(fds);
+    return Failure("Failed to create stdin pipe: " + pipe.error());
+  }
+
+  fds.push_back(infds[0]);
+  fds.push_back(infds[1]);
+
+  pipe = os::pipe(outfds);
+  if (pipe.isError()) {
+    close(fds);
+    return Failure("Failed to create stdout pipe: " + pipe.error());
+  }
+
+  fds.push_back(outfds[0]);
+  fds.push_back(outfds[1]);
+
+  pipe = os::pipe(errfds);
+  if (pipe.isError()) {
+    close(fds);
+    return Failure("Failed to create stderr pipe: " + pipe.error());
+  }
+
+  fds.push_back(errfds[0]);
+  fds.push_back(errfds[1]);
 
-  ContainerIO* out = launchInfo.mutable_out();
-  ContainerIO* err = launchInfo.mutable_err();
-
-  switch (loggerInfo.out.type()) {
-    case ContainerLogger::SubprocessInfo::IO::Type::FD:
-      out->set_type(ContainerIO::FD);
-      out->set_fd(loggerInfo.out.fd().get());
-      break;
-    case ContainerLogger::SubprocessInfo::IO::Type::PATH:
-      out->set_type(ContainerIO::PATH);
-      out->set_path(loggerInfo.out.path().get());
-      break;
-    default:
-      UNREACHABLE();
+  Try<Nothing> cloexec = os::cloexec(infds[0]);
+  if (cloexec.isError()) {
+    close(fds);
+    return Failure("Failed to cloexec infds.read: " + cloexec.error());
   }
 
-  switch (loggerInfo.err.type()) {
-    case ContainerLogger::SubprocessInfo::IO::Type::FD:
-      err->set_type(ContainerIO::FD);
-      err->set_fd(loggerInfo.err.fd().get());
-      break;
-    case ContainerLogger::SubprocessInfo::IO::Type::PATH:
-      err->set_type(ContainerIO::PATH);
-      err->set_path(loggerInfo.err.path().get());
-      break;
-    default:
-      UNREACHABLE();
+  cloexec = os::cloexec(outfds[1]);
+  if (cloexec.isError()) {
+    close(fds);
+    return Failure("Failed to cloexec outfds.write: " + cloexec.error());
   }
 
+  cloexec = os::cloexec(errfds[1]);
+  if (cloexec.isError()) {
+    close(fds);
+    return Failure("Failed to cloexec errfds.write: " + cloexec.error());
+  }
+
+  // Set up our flags to send to the io switchboard server process.
+  IOSwitchboardServerFlags switchboardFlags;
+  switchboardFlags.stdin_to_fd = infds[1];
+  switchboardFlags.stdout_from_fd = outfds[0];
+  switchboardFlags.stdout_to_fd = STDOUT_FILENO;
+  switchboardFlags.stderr_from_fd = errfds[0];
+  switchboardFlags.stderr_to_fd = STDERR_FILENO;
+  switchboardFlags.socket_path = path::join(
+      stringify(os::PATH_SEPARATOR),
+      "tmp",
+      "mesos-io-switchboard-" + UUID::random().toString());
+
+  // Launch the io switchboard server process.
+  // We `dup()` the `stdout` and `stderr` passed to us by the
+  // container logger over the `stdout` and `stderr` of the io
+  // switchboard process itself. In this way, the io switchboard
+  // process simply needs to write to its own `stdout` and
+  // `stderr` in order to send output to the logger files.
+  Try<Subprocess> child = subprocess(
+      path::join(flags.launcher_dir, IOSwitchboardServer::NAME),
+      {IOSwitchboardServer::NAME},
+      Subprocess::PATH("/dev/null"),
+      loggerInfo.out,
+      loggerInfo.err,
+      &switchboardFlags,
+      map<string, string>(),
+      None(),
+      {},
+      {Subprocess::ChildHook::SETSID()});
+
+  if (child.isError()) {
+    close(fds);
+    return Failure("Failed to create io switchboard"
+                   " server process: " + child.error());
+  }
+
+  os::close(infds[1]);
+  os::close(outfds[0]);
+  os::close(errfds[0]);
+
+  // Build an info struct for this container.
+  infos[containerId] = Owned<Info>(new Info(
+    child->pid(),
+    process::reap(child->pid())));
+
+  // Return the set of fds that should be sent to the
+  // container and dup'd onto its stdin/stdout/stderr.
+  ContainerLaunchInfo launchInfo;
+
+  launchInfo.mutable_in()->set_type(ContainerIO::FD);
+  launchInfo.mutable_in()->set_fd(infds[0]);
+
+  launchInfo.mutable_out()->set_type(ContainerIO::FD);
+  launchInfo.mutable_out()->set_fd(outfds[1]);
+
+  launchInfo.mutable_err()->set_type(ContainerIO::FD);
+  launchInfo.mutable_err()->set_fd(errfds[1]);
+
   return launchInfo;
+#endif // __WINDOWS__
+}
+
+
+Future<Nothing> IOSwitchboard::cleanup(
+    const ContainerID& containerId)
+{
+#ifdef __WINDOWS__
+  // Since we don't support spawning an io switchboard server on
+  // windows yet, there is nothing to wait for here.
+  return Nothing();
+#else
+  // We don't particularly care if the process gets reaped or not (it
+  // will clean itself up automatically upon process exit). We just
+  // try to wait for it to exit if we can. For now there is no need
+  // to recover info about a container's IOSwitchboard across agent
+  // restarts (so it's OK to simply return `Nothing()` if we don't
+  // know about a containerId).
+  //
+  // TODO(klueska): Add the ability to recover the `IOSwitchboard`'s
+  // pid and reap it so we can properly return its status here.
+  if (local || !infos.contains(containerId)) {
+    return Nothing();
+  }
+
+  Future<Option<int>> status = infos[containerId]->status;
+
+  infos.erase(containerId);
+
+  return status
+    .then([]() { return Nothing(); });
+#endif // __WINDOWS__
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a13b70ac/src/slave/containerizer/mesos/io/switchboard.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.hpp 
b/src/slave/containerizer/mesos/io/switchboard.hpp
index 5cf3bcd..679f520 100644
--- a/src/slave/containerizer/mesos/io/switchboard.hpp
+++ b/src/slave/containerizer/mesos/io/switchboard.hpp
@@ -63,18 +63,33 @@ public:
       const ContainerID& containerId,
       const mesos::slave::ContainerConfig& containerConfig);
 
+  virtual process::Future<Nothing> cleanup(
+      const ContainerID& containerId);
+
 private:
+  struct Info
+  {
+    Info(pid_t _pid, const process::Future<Option<int>>& _status)
+      : pid(_pid),
+        status(_status) {}
+
+    pid_t pid;
+    process::Future<Option<int>> status;
+  };
+
   IOSwitchboard(
       const Flags& flags,
       bool local,
       process::Owned<mesos::slave::ContainerLogger> logger);
 
   process::Future<Option<mesos::slave::ContainerLaunchInfo>> _prepare(
+      const ContainerID& containerId,
       const mesos::slave::ContainerLogger::SubprocessInfo& loggerInfo);
 
   Flags flags;
   bool local;
   process::Owned<mesos::slave::ContainerLogger> logger;
+  hashmap<ContainerID, process::Owned<Info>> infos;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a13b70ac/src/slave/containerizer/mesos/io/switchboard_main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard_main.cpp 
b/src/slave/containerizer/mesos/io/switchboard_main.cpp
new file mode 100644
index 0000000..79552b9
--- /dev/null
+++ b/src/slave/containerizer/mesos/io/switchboard_main.cpp
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/try.hpp>
+
+#include "slave/containerizer/mesos/io/switchboard.hpp"
+
+using namespace mesos::internal::slave;
+
+using process::Future;
+using process::Owned;
+
+int main(int argc, char** argv)
+{
+  IOSwitchboardServerFlags flags;
+
+  // Load and validate flags from the environment and command line.
+  Try<flags::Warnings> load = flags.load(None(), &argc, &argv);
+
+  if (load.isError()) {
+    EXIT(EXIT_FAILURE) << flags.usage(load.error());
+  }
+
+  Try<Owned<IOSwitchboardServer>> server = IOSwitchboardServer::create(
+      flags.stdin_to_fd,
+      flags.stdout_from_fd,
+      flags.stdout_to_fd,
+      flags.stderr_from_fd,
+      flags.stderr_to_fd,
+      flags.socket_path);
+
+  if (server.isError()) {
+    EXIT(EXIT_FAILURE) << "Failed to create the io switchboard server:"
+                          " " << server.error();
+  }
+
+  Future<Nothing> run = server.get()->run();
+  run.await();
+
+  if (!run.isReady()) {
+    EXIT(EXIT_FAILURE) << "The io switchboard server failed:"
+                       << " " << run.isFailed() ? run.failure()
+                                                : "discarded";
+  }
+
+  return EXIT_SUCCESS;
+}

Reply via email to