Added support to handle ATTACH_CONTAINER_OUPUT in the io switchbaord.

Review: https://reviews.apache.org/r/53974


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dcf0f1cb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dcf0f1cb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dcf0f1cb

Branch: refs/heads/master
Commit: dcf0f1cb1da0a8d0a42dcc89f5acfd7e1e010e5c
Parents: 5e387f4
Author: Kevin Klues <klue...@gmail.com>
Authored: Wed Nov 23 17:51:29 2016 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Thu Dec 1 10:11:45 2016 -0800

----------------------------------------------------------------------
 .../containerizer/mesos/io/switchboard.cpp      | 345 ++++++++++++++++---
 .../containerizer/mesos/io/switchboard.hpp      |  12 +-
 .../containerizer/mesos/io/switchboard_main.cpp |   3 +-
 .../containerizer/io_switchboard_tests.cpp      | 166 +++++++++
 4 files changed, 466 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dcf0f1cb/src/slave/containerizer/mesos/io/switchboard.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.cpp 
b/src/slave/containerizer/mesos/io/switchboard.cpp
index 3118d98..c4c9fc7 100644
--- a/src/slave/containerizer/mesos/io/switchboard.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard.cpp
@@ -14,6 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <list>
 #include <map>
 #include <string>
 #include <vector>
@@ -35,13 +36,17 @@
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/recordio.hpp>
 
+#include <mesos/http.hpp>
 #include <mesos/type_utils.hpp>
 
 #include <mesos/agent/agent.hpp>
 
 #include <mesos/slave/container_logger.hpp>
 
+#include "common/http.hpp"
+
 #include "slave/flags.hpp"
 #include "slave/state.hpp"
 
@@ -65,6 +70,7 @@ using process::Process;
 using process::Promise;
 using process::Subprocess;
 
+using std::list;
 using std::map;
 using std::string;
 using std::vector;
@@ -275,6 +281,7 @@ Future<Option<ContainerLaunchInfo>> IOSwitchboard::_prepare(
   switchboardFlags.stdout_to_fd = STDOUT_FILENO;
   switchboardFlags.stderr_from_fd = errfds[0];
   switchboardFlags.stderr_to_fd = STDERR_FILENO;
+  switchboardFlags.wait_for_connection = false;
   switchboardFlags.socket_path = path::join(
       stringify(os::PATH_SEPARATOR),
       "tmp",
@@ -422,11 +429,43 @@ public:
       int _stdoutToFd,
       int _stderrFromFd,
       int _stderrToFd,
-      const unix::Socket& _socket);
+      const unix::Socket& _socket,
+      bool waitForConnection);
+
+  virtual void finalize();
 
   Future<Nothing> run();
 
 private:
+  class HttpConnection
+  {
+  public:
+    HttpConnection(
+        const http::Pipe::Writer& _writer,
+        const ContentType& contentType)
+      : writer(_writer),
+        encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+
+    bool send(const agent::ProcessIO& message)
+    {
+      return writer.write(encoder.encode(message));
+    }
+
+    bool close()
+    {
+      return writer.close();
+    }
+
+    process::Future<Nothing> closed() const
+    {
+      return writer.readerClosed();
+    }
+
+  private:
+    http::Pipe::Writer writer;
+    ::recordio::Encoder<agent::ProcessIO> encoder;
+  };
+
   // Sit in an accept loop forever.
   void acceptLoop();
 
@@ -437,6 +476,16 @@ private:
   // with the same format we receive them in.
   Future<http::Response> handler(const http::Request& request);
 
+  // Handle `ATTACH_CONTAINER_INPUT` calls.
+  Future<http::Response> attachContainerInput(
+    const agent::Call& call,
+    const ContentType& contentType);
+
+  // Handle `ATTACH_CONTAINER_OUTPUT` calls.
+  Future<http::Response> attachContainerOutput(
+    const agent::Call& call,
+    const ContentType& contentType);
+
   // Asynchronously receive data as we read it from our
   // `stdoutFromFd` and `stdoutFromFd` file descriptors.
   void outputHook(
@@ -449,7 +498,13 @@ private:
   int stderrFromFd;
   int stderrToFd;
   unix::Socket socket;
+  bool waitForConnection;
   Promise<Nothing> promise;
+  Promise<Nothing> startRedirect;
+  // The following must be a `std::list`
+  // for proper erase semantics later on.
+  list<HttpConnection> connections;
+  Option<Failure> failure;
 };
 
 
@@ -459,7 +514,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
     int stdoutToFd,
     int stderrFromFd,
     int stderrToFd,
-    const string& socketPath)
+    const string& socketPath,
+    bool waitForConnection)
 {
   Try<unix::Socket> socket = unix::Socket::create();
   if (socket.isError()) {
@@ -490,7 +546,8 @@ Try<Owned<IOSwitchboardServer>> IOSwitchboardServer::create(
       stdoutToFd,
       stderrFromFd,
       stderrToFd,
-      socket.get());
+      socket.get(),
+      waitForConnection);
 }
 
 
@@ -500,14 +557,16 @@ IOSwitchboardServer::IOSwitchboardServer(
     int stdoutToFd,
     int stderrFromFd,
     int stderrToFd,
-    const unix::Socket& socket)
+    const unix::Socket& socket,
+    bool waitForConnection)
   : process(new IOSwitchboardServerProcess(
         stdinToFd,
         stdoutFromFd,
         stdoutToFd,
         stderrFromFd,
         stderrToFd,
-        socket))
+        socket,
+        waitForConnection))
 {
   spawn(process.get());
 }
@@ -532,67 +591,93 @@ IOSwitchboardServerProcess::IOSwitchboardServerProcess(
     int _stdoutToFd,
     int _stderrFromFd,
     int _stderrToFd,
-    const unix::Socket& _socket)
+    const unix::Socket& _socket,
+    bool _waitForConnection)
   : stdinToFd(_stdinToFd),
     stdoutFromFd(_stdoutFromFd),
     stdoutToFd(_stdoutToFd),
     stderrFromFd(_stderrFromFd),
     stderrToFd(_stderrToFd),
-    socket(_socket) {}
+    socket(_socket),
+    waitForConnection(_waitForConnection) {}
 
 
 Future<Nothing> IOSwitchboardServerProcess::run()
 {
-  Future<Nothing> stdoutRedirect = process::io::redirect(
-      stdoutFromFd,
-      stdoutToFd,
-      4096,
-      {defer(self(),
-             &Self::outputHook,
-             lambda::_1,
-             agent::ProcessIO::Data::STDOUT)});
-
-  Future<Nothing> stderrRedirect = process::io::redirect(
-      stderrFromFd,
-      stderrToFd,
-      4096,
-      {defer(self(),
-             &Self::outputHook,
-             lambda::_1,
-             agent::ProcessIO::Data::STDERR)});
-
-  // Set the future once our IO redirects finish. On failure,
-  // fail the future.
-  //
-  // For now we simply assume that whenever both `stdoutRedirect`
-  // and `stderrRedirect` have completed then it is OK to exit the
-  // switchboard process. We assume this because `stdoutRedirect`
-  // and `stderrRedirect` will only complete after both the read end
-  // of the `stdout` stream and the read end of the `stderr` stream
-  // have been drained. Since draining these `fds` represents having
-  // read everything possible from a container's `stdout` and
-  // `stderr` this is likely sufficient termination criteria.
-  // However, there's a non-zero chance that *some* containers may
-  // decide to close their `stdout` and `stderr` while expecting to
-  // continue reading from `stdin`. For now we don't support
-  // containers with this behavior and we will exit out of the
-  // switchboard process early.
-  //
-  // TODO(klueska): Add support to asynchronously detect when
-  // `stdinToFd` has become invalid before deciding to terminate.
-  stdoutRedirect
-    .onFailed(defer(self(), [this](const string& message) {
-       promise.fail("Failed redirecting stdout: " + message);
-    }));
-
-  stderrRedirect
-    .onFailed(defer(self(), [this](const string& message) {
-       promise.fail("Failed redirecting stderr: " + message);
-    }));
+  if (!waitForConnection) {
+    startRedirect.set(Nothing());
+  }
 
-  collect(stdoutRedirect, stderrRedirect)
+  startRedirect.future()
     .then(defer(self(), [this]() {
-      promise.set(Nothing());
+      Future<Nothing> stdoutRedirect = process::io::redirect(
+          stdoutFromFd,
+          stdoutToFd,
+          4096,
+          {defer(self(),
+                 &Self::outputHook,
+                 lambda::_1,
+                 agent::ProcessIO::Data::STDOUT)});
+
+      Future<Nothing> stderrRedirect = process::io::redirect(
+          stderrFromFd,
+          stderrToFd,
+          4096,
+          {defer(self(),
+                 &Self::outputHook,
+                 lambda::_1,
+                 agent::ProcessIO::Data::STDERR)});
+
+      // Set the future once our IO redirects finish. On failure,
+      // fail the future.
+      //
+      // For now we simply assume that whenever both `stdoutRedirect`
+      // and `stderrRedirect` have completed then it is OK to exit the
+      // switchboard process. We assume this because `stdoutRedirect`
+      // and `stderrRedirect` will only complete after both the read end
+      // of the `stdout` stream and the read end of the `stderr` stream
+      // have been drained. Since draining these `fds` represents having
+      // read everything possible from a container's `stdout` and
+      // `stderr` this is likely sufficient termination criteria.
+      // However, there's a non-zero chance that *some* containers may
+      // decide to close their `stdout` and `stderr` while expecting to
+      // continue reading from `stdin`. For now we don't support
+      // containers with this behavior and we will exit out of the
+      // switchboard process early.
+      //
+      // NOTE: We always call `terminate()` with `false` to ensure
+      // that our event queue is drained before actually terminating.
+      // Without this, it's possible that we might drop some data we
+      // are trying to write out over any open connections we have.
+      //
+      // TODO(klueska): Add support to asynchronously detect when
+      // `stdinToFd` has become invalid before deciding to terminate.
+      stdoutRedirect
+        .onFailed(defer(self(), [this](const string& message) {
+           failure = Failure("Failed redirecting stdout: " + message);
+           terminate(self(), false);
+        }))
+        .onDiscarded(defer(self(), [this]() {
+           failure = Failure("Redirecting stdout discarded");
+           terminate(self(), false);
+        }));
+
+      stderrRedirect
+        .onFailed(defer(self(), [this](const string& message) {
+           failure = Failure("Failed redirecting stderr: " + message);
+           terminate(self(), false);
+        }))
+        .onDiscarded(defer(self(), [this]() {
+           failure = Failure("Redirecting stderr discarded");
+           terminate(self(), false);
+        }));
+
+      collect(stdoutRedirect, stderrRedirect)
+        .then(defer(self(), [this]() {
+          terminate(self(), false);
+          return Nothing();
+        }));
+
       return Nothing();
     }));
 
@@ -602,12 +687,27 @@ Future<Nothing> IOSwitchboardServerProcess::run()
 }
 
 
+void IOSwitchboardServerProcess::finalize()
+{
+  foreach (HttpConnection& connection, connections) {
+    connection.close();
+  }
+
+  if (failure.isSome()) {
+    promise.fail(failure->message);
+  } else {
+    promise.set(Nothing());
+  }
+}
+
+
 void IOSwitchboardServerProcess::acceptLoop()
 {
   socket.accept()
     .onAny(defer(self(), [this](const Future<unix::Socket>& socket) {
       if (!socket.isReady()) {
-        promise.fail("Failed trying to accept connection");
+        failure = Failure("Failed trying to accept connection");
+        terminate(self(), false);
       }
 
       // We intentionally ignore errors on the serve path, and assume
@@ -628,7 +728,117 @@ void IOSwitchboardServerProcess::acceptLoop()
 Future<http::Response> IOSwitchboardServerProcess::handler(
     const http::Request& request)
 {
-  return http::BadRequest("Unsupported");
+  if (request.method != "POST") {
+    return http::MethodNotAllowed({"POST"}, request.method);
+  }
+
+  agent::Call call;
+
+  Option<string> contentType =
+    request.headers.get(strings::lower("Content-Type"));
+
+  if (contentType.isNone()) {
+    return http::BadRequest("Expecting 'Content-Type' to be present");
+  }
+
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!call.ParseFromString(request.body)) {
+      return http::BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+
+    if (value.isError()) {
+      return http::BadRequest("Failed to parse body into JSON:"
+                              " " + value.error());
+    }
+
+    Try<agent::Call> parse = ::protobuf::parse<agent::Call>(value.get());
+    if (parse.isError()) {
+      return http::BadRequest("Failed to convert JSON into Call protobuf:"
+                              " " + parse.error());
+    }
+
+    call = parse.get();
+  } else {
+    return http::UnsupportedMediaType(
+        "Expecting 'Content-Type' of '" + stringify(APPLICATION_JSON) +
+        "' or '" + stringify(APPLICATION_PROTOBUF) + "'");
+  }
+
+  ContentType acceptType;
+  if (request.acceptsMediaType(APPLICATION_JSON)) {
+    acceptType = ContentType::JSON;
+  } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+    acceptType = ContentType::PROTOBUF;
+  } else {
+    return http::NotAcceptable(
+        "Expecting 'Accept' to allow '" + stringify(APPLICATION_JSON) + "'"
+        " or '" + stringify(APPLICATION_PROTOBUF) + "'");
+  }
+
+  switch (call.type()) {
+    case agent::Call::ATTACH_CONTAINER_INPUT:
+      return attachContainerInput(call, acceptType);
+
+    case agent::Call::ATTACH_CONTAINER_OUTPUT:
+      return attachContainerOutput(call, acceptType);
+
+    default:
+      return http::NotImplemented();
+  }
+
+  UNREACHABLE();
+}
+
+
+Future<http::Response> IOSwitchboardServerProcess::attachContainerInput(
+    const agent::Call& call,
+    const ContentType& contentType)
+{
+  CHECK_EQ(agent::Call::ATTACH_CONTAINER_INPUT, call.type());
+  return http::NotImplemented("ATTACH_CONTAINER_INPUT");
+}
+
+
+Future<http::Response> IOSwitchboardServerProcess::attachContainerOutput(
+    const agent::Call& call,
+    const ContentType& contentType)
+{
+  CHECK_EQ(agent::Call::ATTACH_CONTAINER_OUTPUT, call.type());
+
+  http::Pipe pipe;
+  http::OK ok;
+
+  ok.headers["Content-Type"] = stringify(contentType);
+  ok.type = http::Response::PIPE;
+  ok.reader = pipe.reader();
+
+  // We store the connection in a list and wait for asynchronous
+  // calls to `receiveOutput()` to actually push data out over the
+  // connection. If we ever detect a connection has been closed,
+  // we remove it from this list.
+  HttpConnection connection(pipe.writer(), contentType);
+  auto iterator = connections.insert(connections.end(), connection);
+
+  // We use the `startRedirect` promise to indicate when we should
+  // begin reading data from our `stdoutFromFd` and `stderrFromFd`
+  // file descriptors. If we were started with the `waitForConnection`
+  // parameter set to `true`, only set this promise here once the
+  // first connection has been established.
+  if (!startRedirect.future().isReady()) {
+    startRedirect.set(Nothing());
+  }
+
+  connection.closed()
+    .then(defer(self(), [this, iterator]() {
+      // Erasing from a `std::list` only invalidates the iterator of
+      // the object being erased. All other iterators remain valid.
+      connections.erase(iterator);
+      return Nothing();
+    }));
+
+  return ok;
 }
 
 
@@ -636,6 +846,27 @@ void IOSwitchboardServerProcess::outputHook(
     const string& data,
     const agent::ProcessIO::Data::Type& type)
 {
+  // Break early if there are no connections to send the data to.
+  if (connections.size() == 0) {
+    return;
+  }
+
+  // Build a `ProcessIO` message from the data.
+  agent::ProcessIO message;
+  message.set_type(agent::ProcessIO::DATA);
+  message.mutable_data()->set_type(type);
+  message.mutable_data()->set_data(data);
+
+  // Walk through our list of connections and write the message to
+  // them. It's possible that a write might fail if the writer has
+  // been closed. That's OK because we already take care of removing
+  // closed connections from our list via the future returned by
+  // the `HttpConnection::closed()` call above. We might do a few
+  // unnecessary writes if we have a bunch of messages queued up,
+  // but that shouldn't be a problem.
+  foreach (HttpConnection& connection, connections) {
+    connection.send(message);
+  }
 }
 #endif // __WINDOWS__
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/dcf0f1cb/src/slave/containerizer/mesos/io/switchboard.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard.hpp 
b/src/slave/containerizer/mesos/io/switchboard.hpp
index 5045dfd..c0db39b 100644
--- a/src/slave/containerizer/mesos/io/switchboard.hpp
+++ b/src/slave/containerizer/mesos/io/switchboard.hpp
@@ -120,7 +120,8 @@ public:
       int stdoutToFd,
       int stderrFromFd,
       int stderrToFd,
-      const std::string& socketPath);
+      const std::string& socketPath,
+      bool waitForConnection = false);
 
   ~IOSwitchboardServer();
 
@@ -133,7 +134,8 @@ private:
       int stdoutToFd,
       int stderrFromFd,
       int stderrToFd,
-      const process::network::unix::Socket& socket);
+      const process::network::unix::Socket& socket,
+      bool waitForConnection);
 
   process::Owned<IOSwitchboardServerProcess> process;
 };
@@ -183,6 +185,11 @@ struct IOSwitchboardServerFlags : public virtual 
flags::FlagsBase
         "A file descriptor where data read from\n"
         "'stderr_from_fd' should be redirected to.");
 
+    add(&IOSwitchboardServerFlags::wait_for_connection,
+        "wait_for_connection",
+        "A boolean indicating whether the server should wait for the\n"
+        "first connection before reading any data from the '*_from_fd's.");
+
     add(&IOSwitchboardServerFlags::socket_path,
         "socket_address",
         "The path of the unix domain socket this\n"
@@ -195,6 +202,7 @@ struct IOSwitchboardServerFlags : public virtual 
flags::FlagsBase
   int stderr_from_fd;
   int stderr_to_fd;
   std::string socket_path;
+  bool wait_for_connection;
 };
 #endif // __WINDOWS__
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/dcf0f1cb/src/slave/containerizer/mesos/io/switchboard_main.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/io/switchboard_main.cpp 
b/src/slave/containerizer/mesos/io/switchboard_main.cpp
index 79552b9..923a765 100644
--- a/src/slave/containerizer/mesos/io/switchboard_main.cpp
+++ b/src/slave/containerizer/mesos/io/switchboard_main.cpp
@@ -43,7 +43,8 @@ int main(int argc, char** argv)
       flags.stdout_to_fd,
       flags.stderr_from_fd,
       flags.stderr_to_fd,
-      flags.socket_path);
+      flags.socket_path,
+      flags.wait_for_connection);
 
   if (server.isError()) {
     EXIT(EXIT_FAILURE) << "Failed to create the io switchboard server:"

http://git-wip-us.apache.org/repos/asf/mesos/blob/dcf0f1cb/src/tests/containerizer/io_switchboard_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/io_switchboard_tests.cpp 
b/src/tests/containerizer/io_switchboard_tests.cpp
index 54760ea..d82f22b 100644
--- a/src/tests/containerizer/io_switchboard_tests.cpp
+++ b/src/tests/containerizer/io_switchboard_tests.cpp
@@ -16,15 +16,36 @@
 
 #include <string>
 
+#include <process/address.hpp>
+#include <process/future.hpp>
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
+#include <stout/json.hpp>
 #include <stout/os.hpp>
+#include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
+#include <mesos/http.hpp>
+
+#include <mesos/agent/agent.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
 #include "slave/containerizer/mesos/io/switchboard.hpp"
 
 #include "tests/mesos.hpp"
 
+namespace http = process::http;
+
+#ifndef __WINDOWS__
+namespace unix = process::network::unix;
+#endif // __WINDOWS__
+
+using mesos::agent::Call;
+
+
 using mesos::internal::slave::IOSwitchboardServer;
 
 using process::Future;
@@ -126,6 +147,151 @@ TEST_F(IOSwitchboardTest, ServerRedirectLog)
 
   EXPECT_EQ(data, read.get());
 }
+
+
+TEST_F(IOSwitchboardTest, ServerAttachOutput)
+{
+  Try<int> nullFd = os::open("/dev/null", O_RDWR);
+  ASSERT_SOME(nullFd);
+
+  string stdoutPath = path::join(sandbox.get(), "stdout");
+  Try<int> stdoutFd = os::open(
+      stdoutPath,
+      O_WRONLY | O_CREAT,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  ASSERT_SOME(stdoutFd);
+
+  string stderrPath = path::join(sandbox.get(), "stderr");
+  Try<int> stderrFd = os::open(
+      stderrPath,
+      O_WRONLY | O_CREAT,
+      S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+  ASSERT_SOME(stderrFd);
+
+  string data =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim "
+    "ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
+    "pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
+    "culpa qui officia deserunt mollit anim id est laborum.";
+
+  while (Bytes(data.size()) < Megabytes(1)) {
+    data.append(data);
+  }
+
+  Try<Nothing> write = os::write(stdoutFd.get(), data);
+  ASSERT_SOME(write);
+
+  write = os::write(stderrFd.get(), data);
+  ASSERT_SOME(write);
+
+  os::close(stdoutFd.get());
+  os::close(stderrFd.get());
+
+  stdoutFd = os::open(stdoutPath, O_RDONLY);
+  ASSERT_SOME(stdoutFd);
+
+  stderrFd = os::open(stderrPath, O_RDONLY);
+  ASSERT_SOME(stderrFd);
+
+  string socketPath = path::join(
+      sandbox.get(),
+      "mesos-io-switchboard-" + UUID::random().toString());
+
+  Try<Owned<IOSwitchboardServer>> server = IOSwitchboardServer::create(
+      nullFd.get(),
+      stdoutFd.get(),
+      nullFd.get(),
+      stderrFd.get(),
+      nullFd.get(),
+      socketPath,
+      true);
+
+  ASSERT_SOME(server);
+
+  Future<Nothing> runServer = server.get()->run();
+
+  Call call;
+  call.set_type(Call::ATTACH_CONTAINER_OUTPUT);
+
+  Call::AttachContainerOutput* attach = call.mutable_attach_container_output();
+  attach->mutable_container_id()->set_value(UUID::random().toString());
+
+  http::Request request;
+  request.method = "POST";
+  request.url = http::URL("http", "", 80, "/");
+  request.keepAlive = true;
+  request.headers["Accept"] = APPLICATION_JSON;
+  request.headers["Content-Type"] = APPLICATION_JSON;
+  request.body = stringify(JSON::protobuf(call));
+
+  Try<unix::Address> address = unix::Address::create(socketPath);
+  ASSERT_SOME(address);
+
+  Future<http::Connection> _connection = http::connect(address.get());
+  AWAIT_READY(_connection);
+
+  http::Connection connection = _connection.get();
+  Future<http::Response> response = connection.send(request, true);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response.get().type);
+
+  Option<http::Pipe::Reader> reader = response.get().reader;
+  ASSERT_SOME(reader);
+
+  auto deserializer = [](const string& body) {
+    Try<JSON::Value> value = JSON::parse(body);
+    Try<agent::ProcessIO> parse =
+      ::protobuf::parse<agent::ProcessIO>(value.get());
+    return parse;
+  };
+
+  recordio::Reader<agent::ProcessIO> responseDecoder(
+      ::recordio::Decoder<agent::ProcessIO>(deserializer),
+      reader.get());
+
+  string stdoutReceived = "";
+  string stderrReceived = "";
+
+  while (true) {
+    Future<Result<agent::ProcessIO>> _message = responseDecoder.read();
+    AWAIT_READY(_message);
+
+    if (_message->isNone()) {
+      break;
+    }
+
+    ASSERT_SOME(_message.get());
+
+    agent::ProcessIO message = _message.get().get();
+
+    ASSERT_EQ(agent::ProcessIO::DATA, message.type());
+
+    ASSERT_TRUE(message.data().type() == agent::ProcessIO::Data::STDOUT ||
+                message.data().type() == agent::ProcessIO::Data::STDERR);
+
+    if (message.data().type() == agent::ProcessIO::Data::STDOUT) {
+      stdoutReceived += message.data().data();
+    } else if (message.data().type() == agent::ProcessIO::Data::STDERR) {
+      stderrReceived += message.data().data();
+    }
+  }
+
+  AWAIT_ASSERT_READY(runServer);
+
+  os::close(nullFd.get());
+  os::close(stdoutFd.get());
+  os::close(stderrFd.get());
+
+  EXPECT_EQ(data, stdoutReceived);
+  EXPECT_EQ(data, stderrReceived);
+}
 #endif // __WINDOWS__
 
 } // namespace tests {

Reply via email to