Added a server side component for the IOSwitchboard.

The 'IOSwitchboardServer' component encapsulates the server side logic
for redirecting the 'stdin/stdout/stderr' of a container to/from
multiple sources/targets. For now, we only redirect IO from a
container to the FDs supplied to us by the logger.  We also send the
stdout/stderr data to a simple HTTP server that we launch on a unix
domain socket set up by the agent.  Right now this server is just a
stub and doesn't do anything useful.

In future commits, we will expand this HTTP server to handle
'ATTACH_CONTAINER_INPUT' and 'ATTACH_CONTAINER_OUTPUT' calls on behalf
of a container. It will use the stdout/stderr messages passed to it to
and send that data over the response stream to any clients connected
with an 'ATTACH_CONTAINER_OUTPUT' call. Likewise, it will take any
input streamed in over a 'ATTACH_CONTAINER_INPUT' request and write it
to a container's stdin.

In 'local' mode, it will be run inside the agent itself. In
'non-local' mode, it will be run as an external process to survive
agent restarts.

Review: https://reviews.apache.org/r/54147


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4223786a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4223786a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4223786a

Branch: refs/heads/master
Commit: 4223786a31eaabdd82bd9fa4ca174ae140774805
Parents: b7937a6
Author: Kevin Klues <klue...@gmail.com>
Authored: Mon Nov 28 20:53:18 2016 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Thu Dec 1 10:11:45 2016 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 .../containerizer/mesos/io/switchboard.cpp      | 246 +++++++++++++++++++
 .../containerizer/mesos/io/switchboard.hpp      | 112 ++++++++-
 .../containerizer/io_switchboard_tests.cpp      | 133 ++++++++++
 4 files changed, 489 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4223786a/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7750ed7..4166274 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2190,6 +2190,7 @@ mesos_tests_SOURCES =                                     
        \
   tests/containerizer/docker_containerizer_tests.cpp           \
   tests/containerizer/docker_spec_tests.cpp                    \
   tests/containerizer/docker_tests.cpp                         \
+  tests/containerizer/io_switchboard_tests.cpp                 \
   tests/containerizer/isolator_tests.cpp                       \
   tests/containerizer/launcher.cpp                             \
   tests/containerizer/memory_isolator_tests.cpp                        \

http://git-wip-us.apache.org/repos/asf/mesos/blob/4223786a/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp 
b/src/slave/containerizer/mesos/io/switchboard.cpp
index 334133b..5751853 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -16,18 +16,33 @@
 
 #include <string>
 
+#include <process/collect.hpp>
 #include <process/defer.hpp>
+#include <process/dispatch.hpp>
 #include <process/future.hpp>
+#include <process/io.hpp>
 #include <process/owned.hpp>
 
+#include <mesos/agent/agent.hpp>
+
+#include <mesos/slave/container_logger.hpp>
+
 #include "slave/containerizer/mesos/io/switchboard.hpp"
 
+namespace http = process::http;
+
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
+
 using std::string;
 
 using process::Failure;
 using process::Future;
 using process::Owned;
 using process::PID;
+using process::Process;
+using process::Promise;
 
 using mesos::slave::ContainerConfig;
 using mesos::slave::ContainerIO;
@@ -139,6 +154,237 @@ Future<Option<ContainerLaunchInfo>> 
IOSwitchboard::_prepare(
   return launchInfo;
 }
 
+
+#ifndef __WINDOWS__
+constexpr char IOSwitchboardServer::NAME[];
+
+
+class IOSwitchboardServerProcess : public Process<IOSwitchboardServerProcess>
+{
+public:
+  IOSwitchboardServerProcess(
+      int _stdinToFd,
+      int _stdoutFromFd,
+      int _stdoutToFd,
+      int _stderrFromFd,
+      int _stderrToFd,
+      const unix::Socket& _socket);
+
+  Future<Nothing> run();
+
+private:
+  // Sit in an accept loop forever.
+  void acceptLoop();
+
+  // Parse the request and look for `ATTACH_CONTAINER_INPUT` and
+  // `ATTACH_CONTAINER_OUTPUT` calls. We call their corresponding
+  // handler functions once we have parsed them. We accept calls as
+  // both `APPLICATION_PROTOBUF` and `APPLICATION_JSON` and respond
+  // with the same format we receive them in.
+  Future<http::Response> handler(const http::Request& request);
+
+  // Asynchronously receive data as we read it from our
+  // `stdoutFromFd` and `stdoutFromFd` file descriptors.
+  void outputHook(
+      const string& data,
+      const agent::ProcessIO::Data::Type& type);
+
+  int stdinToFd;
+  int stdoutFromFd;
+  int stdoutToFd;
+  int stderrFromFd;
+  int stderrToFd;
+  unix::Socket socket;
+  Promise<Nothing> promise;
+};
+
+
+Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
+    int stdinToFd,
+    int stdoutFromFd,
+    int stdoutToFd,
+    int stderrFromFd,
+    int stderrToFd,
+    const string& socketPath)
+{
+  Try<unix::Socket> socket = unix::Socket::create();
+  if (socket.isError()) {
+    return Error("Failed to create socket: " + socket.error());
+  }
+
+  Try<unix::Address> address = unix::Address::create(socketPath);
+  if (address.isError()) {
+    return Error("Failed to build address from '" + socketPath + "':"
+                 " " + address.error());
+  }
+
+  Try<unix::Address> bind = socket->bind(address.get());
+  if (bind.isError()) {
+    return Error("Failed to bind to address '" + socketPath + "':"
+                 " " + bind.error());
+  }
+
+  Try<Nothing> listen = socket->listen(64);
+  if (listen.isError()) {
+    return Error("Failed to listen on socket at address"
+                 " '" + socketPath + "': " + listen.error());
+  }
+
+  return new IOSwitchboardServer(
+      stdinToFd,
+      stdoutFromFd,
+      stdoutToFd,
+      stderrFromFd,
+      stderrToFd,
+      socket.get());
+}
+
+
+IOSwitchboardServer::IOSwitchboardServer(
+    int stdinToFd,
+    int stdoutFromFd,
+    int stdoutToFd,
+    int stderrFromFd,
+    int stderrToFd,
+    const unix::Socket& socket)
+  : process(new IOSwitchboardServerProcess(
+        stdinToFd,
+        stdoutFromFd,
+        stdoutToFd,
+        stderrFromFd,
+        stderrToFd,
+        socket))
+{
+  spawn(process.get());
+}
+
+
+IOSwitchboardServer::~IOSwitchboardServer()
+{
+  terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> IOSwitchboardServer::run()
+{
+  return dispatch(process.get(), &IOSwitchboardServerProcess::run);
+}
+
+
+IOSwitchboardServerProcess::IOSwitchboardServerProcess(
+    int _stdinToFd,
+    int _stdoutFromFd,
+    int _stdoutToFd,
+    int _stderrFromFd,
+    int _stderrToFd,
+    const unix::Socket& _socket)
+  : stdinToFd(_stdinToFd),
+    stdoutFromFd(_stdoutFromFd),
+    stdoutToFd(_stdoutToFd),
+    stderrFromFd(_stderrFromFd),
+    stderrToFd(_stderrToFd),
+    socket(_socket) {}
+
+
+Future<Nothing> IOSwitchboardServerProcess::run()
+{
+  Future<Nothing> stdoutRedirect = process::io::redirect(
+      stdoutFromFd,
+      stdoutToFd,
+      4096,
+      {defer(self(),
+             &Self::outputHook,
+             lambda::_1,
+             agent::ProcessIO::Data::STDOUT)});
+
+  Future<Nothing> stderrRedirect = process::io::redirect(
+      stderrFromFd,
+      stderrToFd,
+      4096,
+      {defer(self(),
+             &Self::outputHook,
+             lambda::_1,
+             agent::ProcessIO::Data::STDERR)});
+
+  // Set the future once our IO redirects finish. On failure,
+  // fail the future.
+  //
+  // For now we simply assume that whenever both `stdoutRedirect`
+  // and `stderrRedirect` have completed then it is OK to exit the
+  // switchboard process. We assume this because `stdoutRedirect`
+  // and `stderrRedirect` will only complete after both the read end
+  // of the `stdout` stream and the read end of the `stderr` stream
+  // have been drained. Since draining these `fds` represents having
+  // read everything possible from a container's `stdout` and
+  // `stderr` this is likely sufficient termination criteria.
+  // However, there's a non-zero chance that *some* containers may
+  // decide to close their `stdout` and `stderr` while expecting to
+  // continue reading from `stdin`. For now we don't support
+  // containers with this behavior and we will exit out of the
+  // switchboard process early.
+  //
+  // TODO(klueska): Add support to asynchronously detect when
+  // `stdinToFd` has become invalid before deciding to terminate.
+  stdoutRedirect
+    .onFailed(defer(self(), [this](const string& message) {
+       promise.fail("Failed redirecting stdout: " + message);
+    }));
+
+  stderrRedirect
+    .onFailed(defer(self(), [this](const string& message) {
+       promise.fail("Failed redirecting stderr: " + message);
+    }));
+
+  collect(stdoutRedirect, stderrRedirect)
+    .then(defer(self(), [this]() {
+      promise.set(Nothing());
+      return Nothing();
+    }));
+
+  acceptLoop();
+
+  return promise.future();
+}
+
+
+void IOSwitchboardServerProcess::acceptLoop()
+{
+  socket.accept()
+    .onAny(defer(self(), [this](const Future<unix::Socket>& socket) {
+      if (!socket.isReady()) {
+        promise.fail("Failed trying to accept connection");
+      }
+
+      // We intentionally ignore errors on the serve path, and assume
+      // that they will eventually be propagated back to the client in
+      // one form or another (e.g. a timeout on the client side). We
+      // explicitly *don't* want to kill the whole server though, just
+      // beause a single connection fails.
+      http::serve(
+          socket.get(),
+          defer(self(), &Self::handler, lambda::_1));
+
+      // Use `dispatch` to limit the size of the call stack.
+      dispatch(self(), &Self::acceptLoop);
+    }));
+}
+
+
+Future<http::Response> IOSwitchboardServerProcess::handler(
+    const http::Request& request)
+{
+  return http::BadRequest("Unsupported");
+}
+
+
+void IOSwitchboardServerProcess::outputHook(
+    const string& data,
+    const agent::ProcessIO::Data::Type& type)
+{
+}
+#endif // __WINDOWS__
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4223786a/src/slave/containerizer/mesos/io/switchboard.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.hpp 
b/src/slave/containerizer/mesos/io/switchboard.hpp
index 08cfb0a..5cf3bcd 100644
--- a/src/slave/containerizer/mesos/io/switchboard.hpp
+++ b/src/slave/containerizer/mesos/io/switchboard.hpp
@@ -17,8 +17,11 @@
 #ifndef __MESOS_CONTAINERIZER_IO_SWITCHBOARD_HPP__
 #define __MESOS_CONTAINERIZER_IO_SWITCHBOARD_HPP__
 
+#include <string>
+
 #include <process/future.hpp>
 #include <process/owned.hpp>
+#include <process/socket.hpp>
 
 #include <stout/try.hpp>
 
@@ -32,9 +35,10 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-// The I/O switchboard is designed to feed stdin to a container from
-// an external source, as well as redirect the stdin/stdout of a
-// container to multiple targets.
+// The `IOSwitchboard` is a component in the agent whose job it is to
+// instantiate an `IOSwitchboardServer` that can be used to feed the
+// stdin to a container from an external source, as well as redirect
+// the stdin/stdout of a container to multiple targets.
 //
 // The primary motivation of this component is to enable support in
 // mesos similar to `docker attach` and `docker exec` whereby an
@@ -73,6 +77,108 @@ private:
   process::Owned<mesos::slave::ContainerLogger> logger;
 };
 
+
+#ifndef __WINDOWS__
+// The `IOSwitchboardServer` encapsulates the server side logic for
+// redirecting the `stdin/stdout/stderr` of a container to/from
+// multiple sources/targets. It runs an HTTP server over a unix domain
+// socket in order to process incoming `ATTACH_CONTAINER_INPUT` and
+// `ATTACH_CONTAINER_OUTPUT` calls and redirect a containers
+// `stdin/stdout/stderr` through them. In 'local' mode, it is run
+// inside the agent itself. In 'non-local' mode, it is run as an
+// external process to survive agent restarts.
+class IOSwitchboardServerProcess;
+
+
+class IOSwitchboardServer
+{
+public:
+  static constexpr char NAME[] = "mesos-io-switchboard";
+
+  static Try<process::Owned<IOSwitchboardServer>> create(
+      int stdinToFd,
+      int stdoutFromFd,
+      int stdoutToFd,
+      int stderrFromFd,
+      int stderrToFd,
+      const std::string& socketPath);
+
+  ~IOSwitchboardServer();
+
+  process::Future<Nothing> run();
+
+private:
+  IOSwitchboardServer(
+      int stdinToFd,
+      int stdoutFromFd,
+      int stdoutToFd,
+      int stderrFromFd,
+      int stderrToFd,
+      const process::network::unix::Socket& socket);
+
+  process::Owned<IOSwitchboardServerProcess> process;
+};
+
+
+// The set of flags to pass to the io switchboard server when launched
+// in an external binary.
+struct IOSwitchboardServerFlags : public virtual flags::FlagsBase
+{
+  IOSwitchboardServerFlags()
+  {
+    setUsageMessage(
+      "Usage: " + stringify(IOSwitchboardServer::NAME) + " [options]\n"
+      "The io switchboard server is designed to feed stdin to a container\n"
+      "from an external source, as well as redirect the stdin/stdout of a\n"
+      "container to multiple targets.\n"
+      "\n"
+      "It runs an HTTP server over a unix domain socket in order to process\n"
+      "incoming `ATTACH_CONTAINER_INPUT` and `ATTACH_CONTAINER_OUTPUT` calls\n"
+      "and redirect a containers `stdin/stdout/stderr` through them.\n"
+      "\n"
+      "The primary motivation of this component is to enable support in 
mesos\n"
+      "similar to `docker attach` and `docker exec` whereby an external\n"
+      "client can attach to the stdin/stdout/stderr of a running container 
as\n"
+      "well as launch arbitrary subcommands inside a container and attach to\n"
+      "its stdin/stdout/stderr.\n");
+
+    add(&IOSwitchboardServerFlags::stdin_to_fd,
+        "stdin_to_fd",
+        "The file descriptor where incoming stdin data should be written.");
+
+    add(&IOSwitchboardServerFlags::stdout_from_fd,
+        "stdout_from_fd",
+        "The file descriptor that should be read to consume stdout data.");
+
+    add(&IOSwitchboardServerFlags::stdout_to_fd,
+        "stdout_to_fd",
+        "A file descriptor where data read from\n"
+        "'stdout_from_fd' should be redirected to.");
+
+    add(&IOSwitchboardServerFlags::stderr_from_fd,
+        "stderr_from_fd",
+        "The file descriptor that should be read to consume stderr data.");
+
+    add(&IOSwitchboardServerFlags::stderr_to_fd,
+        "stderr_to_fd",
+        "A file descriptor where data read from\n"
+        "'stderr_from_fd' should be redirected to.");
+
+    add(&IOSwitchboardServerFlags::socket_path,
+        "socket_address",
+        "The path of the unix domain socket this\n"
+        "io switchboard should attach itself to.");
+  }
+
+  int stdin_to_fd;
+  int stdout_from_fd;
+  int stdout_to_fd;
+  int stderr_from_fd;
+  int stderr_to_fd;
+  std::string socket_path;
+};
+#endif // __WINDOWS__
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4223786a/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp 
b/src/tests/containerizer/io_switchboard_tests.cpp
new file mode 100644
index 0000000..54760ea
--- /dev/null
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <process/owned.hpp>
+
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+#include "slave/containerizer/mesos/io/switchboard.hpp"
+
+#include "tests/mesos.hpp"
+
+using mesos::internal::slave::IOSwitchboardServer;
+
+using process::Future;
+using process::Owned;
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class IOSwitchboardTest : public TemporaryDirectoryTest {};
+
+
+#ifndef __WINDOWS__
+TEST_F(IOSwitchboardTest, ServerRedirectLog)
+{
+  int stdoutPipe[2];
+  int stderrPipe[2];
+
+  Try<int> nullFd = os::open("/dev/null", O_RDWR);
+  ASSERT_SOME(nullFd);
+
+  Try<Nothing> pipe = os::pipe(stdoutPipe);
+  ASSERT_SOME(pipe);
+
+  pipe = os::pipe(stderrPipe);
+  ASSERT_SOME(pipe);
+
+  string stdoutPath = path::join(sandbox.get(), "stdout");
+  Try<int> stdoutFd = os::open(
+      stdoutPath,
+      O_RDWR | O_CREAT,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  ASSERT_SOME(stdoutFd);
+
+  string stderrPath = path::join(sandbox.get(), "stderr");
+  Try<int> stderrFd = os::open(
+      stderrPath,
+      O_RDWR | O_CREAT,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  ASSERT_SOME(stderrFd);
+
+  string socketPath = path::join(
+      sandbox.get(),
+      "mesos-io-switchboard-" + UUID::random().toString());
+
+  Try<Owned<IOSwitchboardServer>> server = IOSwitchboardServer::create(
+      nullFd.get(),
+      stdoutPipe[0],
+      stdoutFd.get(),
+      stderrPipe[0],
+      stderrFd.get(),
+      socketPath);
+
+  ASSERT_SOME(server);
+
+  Future<Nothing> runServer  = server.get()->run();
+
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  Try<Nothing> write = os::write(stdoutPipe[1], data);
+  ASSERT_SOME(write);
+
+  write = os::write(stderrPipe[1], data);
+  ASSERT_SOME(write);
+
+  os::close(stdoutPipe[1]);
+  os::close(stderrPipe[1]);
+
+  AWAIT_ASSERT_READY(runServer);
+
+  os::close(nullFd.get());
+  os::close(stdoutPipe[0]);
+  os::close(stderrPipe[0]);
+  os::close(stdoutFd.get());
+  os::close(stderrFd.get());
+
+  Try<string> read = os::read(stdoutPath);
+  ASSERT_SOME(read);
+
+  EXPECT_EQ(data, read.get());
+
+  read = os::read(stderrPath);
+  ASSERT_SOME(read);
+
+  EXPECT_EQ(data, read.get());
+}
+#endif // __WINDOWS__
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

Reply via email to