Repository: mesos
Updated Branches:
  refs/heads/master 800c629a8 -> 0e0fc463b


Added API handler for LAUNCH_NESTED_CONTAINER_SESSION.

In addition to launching the nested container the API handler
ensures that the container is destroyed if the connection breaks.

Review: https://reviews.apache.org/r/54196


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e0fc463
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e0fc463
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e0fc463

Branch: refs/heads/master
Commit: 0e0fc463bc18171db656e0b6892811d75644dbd1
Parents: ec009f4
Author: Vinod Kone <vinodk...@gmail.com>
Authored: Sun Nov 20 18:24:51 2016 +0800
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Thu Dec 1 21:54:28 2016 -0800

----------------------------------------------------------------------
 src/slave/http.cpp       | 175 +++++++++++++++++++++++++++++++++++++++++-
 src/slave/slave.hpp      |   8 ++
 src/slave/validation.cpp |  26 ++++++-
 src/tests/api_tests.cpp  |  84 ++++++++++++++++++++
 4 files changed, 287 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 5c300be..8b104ce 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -30,6 +30,8 @@
 
 #include <mesos/executor/executor.hpp>
 
+#include <mesos/slave/containerizer.hpp>
+
 #include <mesos/v1/agent/agent.hpp>
 
 #include <mesos/v1/executor/executor.hpp>
@@ -71,6 +73,7 @@ using mesos::agent::ProcessIO;
 
 using mesos::internal::recordio::Reader;
 
+using mesos::slave::ContainerClass;
 using mesos::slave::ContainerTermination;
 
 using process::AUTHENTICATION;
@@ -498,7 +501,8 @@ Future<Response> Slave::Http::_api(
       return killNestedContainer(call, acceptType, principal);
 
     case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
-      return NotImplemented();
+      return launchNestedContainerSession(
+          call, contentType, acceptType, principal);
 
     case mesos::agent::Call::ATTACH_CONTAINER_INPUT:
       CHECK_SOME(reader);
@@ -2038,6 +2042,7 @@ Future<Response> Slave::Http::launchNestedContainer(
           call.launch_nested_container().has_container()
             ? call.launch_nested_container().container()
             : Option<ContainerInfo>::none(),
+          ContainerClass::DEFAULT,
           acceptType,
           approver);
     }));
@@ -2048,6 +2053,7 @@ Future<Response> Slave::Http::_launchNestedContainer(
     const ContainerID& containerId,
     const CommandInfo& commandInfo,
     const Option<ContainerInfo>& containerInfo,
+    const Option<ContainerClass>& containerClass,
     ContentType acceptType,
     const Owned<ObjectApprover>& approver) const
 {
@@ -2111,7 +2117,8 @@ Future<Response> Slave::Http::_launchNestedContainer(
       commandInfo,
       containerInfo,
       user,
-      slave->info.id());
+      slave->info.id(),
+      containerClass);
 
   // TODO(bmahler): The containerizers currently require that
   // the caller calls destroy if the launch fails. See MESOS-6214.
@@ -2122,8 +2129,8 @@ Future<Response> Slave::Http::_launchNestedContainer(
 
       slave->containerizer->destroy(containerId)
         .onFailed([=](const string& failure) {
-          LOG(ERROR) << "Failed to destroy nested container "
-                     << containerId << " after launch failure: " << failure;
+          LOG(ERROR) << "Failed to destroy nested container " << containerId
+                     << " after launch failure: " << failure;
         });
     }));
 
@@ -2378,6 +2385,166 @@ Future<Response> Slave::Http::attachContainerInput(
 }
 
 
+// Helper that reads data from `writer` and writes to `reader`.
+// Returns a failed future if there are any errors reading or writing.
+// The future is satisfied when we get a EOF.
+// TODO(vinod): Move this to libprocess if this is more generally useful.
+Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
+{
+  return reader.read()
+    .then([reader, writer](const Future<string>& chunk) mutable
+        -> Future<Nothing> {
+      if (!chunk.isReady()) {
+        return process::Failure(
+            chunk.isFailed() ? chunk.failure() : "discarded");
+      }
+
+      if (chunk->empty()) {
+        // EOF case.
+        return Nothing();
+      }
+
+      if (!writer.write(chunk.get())) {
+        return process::Failure("Write failed to the pipe");
+      }
+
+      return connect(reader, writer);
+    });
+}
+
+
+Future<Response> Slave::Http::launchNestedContainerSession(
+    const mesos::agent::Call& call,
+    ContentType contentType,
+    ContentType acceptType,
+    const Option<string>& principal) const
+{
+  CHECK_EQ(mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION, call.type());
+  CHECK(call.has_launch_nested_container_session());
+
+  const ContainerID& containerId =
+    call.launch_nested_container_session().container_id();
+
+  Future<Owned<ObjectApprover>> approver;
+
+  if (slave->authorizer.isSome()) {
+    authorization::Subject subject;
+    if (principal.isSome()) {
+      subject.set_value(principal.get());
+    }
+
+    approver = slave->authorizer.get()->getObjectApprover(
+        subject, authorization::LAUNCH_NESTED_CONTAINER_SESSION);
+  } else {
+    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
+
+  Future<Response> response = approver
+    .then(defer(slave->self(), [=](const Owned<ObjectApprover>& approver) {
+      return _launchNestedContainer(
+          call.launch_nested_container_session().container_id(),
+          call.launch_nested_container_session().command(),
+          call.launch_nested_container_session().has_container()
+            ? call.launch_nested_container_session().container()
+            : Option<ContainerInfo>::none(),
+          ContainerClass::DEBUG,
+          acceptType,
+          approver);
+    }));
+
+  // Helper to destroy the container.
+  auto destroy = [this](const ContainerID& containerId) {
+    slave->containerizer->destroy(containerId)
+      .onFailed([containerId](const string& failure) {
+        LOG(ERROR) << "Failed to destroy nested container "
+                   << containerId << ": " << failure;
+      });
+  };
+
+  // If `response` has failed or is not `OK`, the container will be
+  // destroyed by `_launchNestedContainer`.
+  return response
+    .then(defer(slave->self(),
+                [=](const Response& response) -> Future<Response> {
+      if (response.status != OK().status) {
+        return response;
+      }
+
+      // If launch is successful, attach to the container output.
+      mesos::agent::Call call;
+      call.set_type(mesos::agent::Call::ATTACH_CONTAINER_OUTPUT);
+      call.mutable_attach_container_output()->mutable_container_id()
+          ->CopyFrom(containerId);
+
+      // Instead of directly returning the response of `attachContainerOutput`
+      // to the client, we use a level of indirection to make sure the 
container
+      // is destroyed when the client connection breaks.
+      return attachContainerOutput(call, contentType, acceptType, principal)
+        .then(defer(slave->self(),
+                    [=](const Response& response) -> Future<Response> {
+          Pipe pipe;
+          Pipe::Writer writer = pipe.writer();
+
+          OK ok;
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = Response::PIPE;
+          ok.reader = pipe.reader();
+
+          CHECK_EQ(Response::PIPE, response.type);
+          CHECK_SOME(response.reader);
+          Pipe::Reader reader = response.reader.get();
+
+          // Read from the `response` pipe and write to
+          // the client's response pipe.
+          // NOTE: Need to cast the lambda to std::function here because of a
+          // limitation of `defer`; `defer` does not work with `mutable` 
lambda.
+          std::function<void (const Future<Nothing>&)> _connect =
+            [=](const Future<Nothing>& future) mutable {
+              CHECK(!future.isDiscarded());
+
+              if (future.isFailed()) {
+                LOG(WARNING) << "Failed to send attach response for "
+                             << containerId << ": " << future.failure();
+
+                writer.fail(future.failure());
+                reader.close();
+              } else {
+                // EOF case.
+                LOG(INFO) << "Received EOF attach response for " << 
containerId;
+
+                writer.close();
+                reader.close();
+              }
+
+              destroy(containerId);
+          };
+
+          connect(reader, writer)
+            .onAny(defer(slave->self(), _connect));
+
+          // Destroy the container if the connection to client is closed.
+          writer.readerClosed()
+            .onAny(defer(slave->self(), [=](const Future<Nothing>& future) {
+              LOG(WARNING)
+                << "Launch nested container session connection"
+                << " for container " << containerId << " closed"
+                << (future.isFailed() ? ": " + future.failure() : "");
+
+              destroy(containerId);
+            }));
+
+          return ok;
+        }))
+        .onFailed(defer(slave->self(), [=](const string& failure) {
+          LOG(WARNING) << "Failed to attach to nested container "
+                       << containerId << ": " << failure;
+
+          destroy(containerId);
+        }));
+    }));
+}
+
+
 Future<Response> Slave::Http::attachContainerOutput(
     const mesos::agent::Call& call,
     ContentType contentType,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dacdbcf..4b94dff 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -38,6 +38,7 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#include <mesos/slave/containerizer.hpp>
 #include <mesos/slave/qos_controller.hpp>
 #include <mesos/slave/resource_estimator.hpp>
 
@@ -644,6 +645,7 @@ private:
         const ContainerID& containerId,
         const CommandInfo& commandInfo,
         const Option<ContainerInfo>& containerInfo,
+        const Option<mesos::slave::ContainerClass>& containerClass,
         ContentType acceptType,
         const Owned<ObjectApprover>& approver) const;
 
@@ -657,6 +659,12 @@ private:
         ContentType acceptType,
         const Option<std::string>& principal) const;
 
+    process::Future<process::http::Response> launchNestedContainerSession(
+        const mesos::agent::Call& call,
+        ContentType contentType,
+        ContentType acceptType,
+        const Option<std::string>& principal) const;
+
     process::Future<process::http::Response> attachContainerInput(
         const mesos::agent::Call& call,
         process::Owned<recordio::Reader<agent::Call>>&& decoder,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/slave/validation.cpp
----------------------------------------------------------------------
diff --git a/src/slave/validation.cpp b/src/slave/validation.cpp
index 4005cfc..15330ad 100644
--- a/src/slave/validation.cpp
+++ b/src/slave/validation.cpp
@@ -222,8 +222,30 @@ Option<Error> validate(
       return None();
     }
 
-    case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION:
-      return Error("Unsupported");
+    case mesos::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION: {
+      if (!call.has_launch_nested_container_session()) {
+        return Error(
+            "Expecting 'launch_nested_container_session' to be present");
+      }
+
+      Option<Error> error = validation::container::validateContainerId(
+          call.launch_nested_container_session().container_id());
+
+      if (error.isSome()) {
+        return Error("'launch_nested_container_session.container_id' is 
invalid"
+                     ": " + error->message);
+      }
+
+      // The parent `ContainerID` is required, so that we know
+      // which container to place it underneath.
+      if (!call.launch_nested_container_session().container_id().has_parent()) 
{
+        return Error(
+            "Expecting 'launch_nested_container_session.container_id.parent'"
+            " to be present");
+      }
+
+      return None();
+    }
 
     case mesos::agent::Call::ATTACH_CONTAINER_INPUT: {
       if (!call.has_attach_container_input()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e0fc463/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index ea6e037..afae6a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3611,6 +3611,90 @@ TEST_P(AgentAPITest, NestedContainerLaunch)
 }
 
 
+// This test verifies that launch nested container session fails when
+// attaching to the output of the container fails. Consequently, the
+// launched container should be destroyed.
+TEST_P(AgentAPITest, LaunchNestedContainerSessionAttachFailure)
+{
+  ContentType contentType = GetParam();
+
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0.1, 32, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<Nothing> executorRegistered;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(FutureSatisfy(&executorRegistered));
+
+  EXPECT_CALL(exec, launchTask(_, _));
+
+  driver.start();
+
+  AWAIT_READY(executorRegistered);
+
+  Future<hashset<ContainerID>> containerIds = containerizer.containers();
+  AWAIT_READY(containerIds);
+  EXPECT_EQ(1u, containerIds->size());
+
+  v1::ContainerID containerId;
+  containerId.set_value(UUID::random().toString());
+  containerId.mutable_parent()->set_value(containerIds->begin()->value());
+
+  v1::agent::Call call;
+  call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+  call.mutable_launch_nested_container_session()->mutable_container_id()
+    ->CopyFrom(containerId);
+
+  Future<http::Response> response = http::streaming::post(
+    slave.get()->pid,
+    "api/v1",
+    createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+    serialize(contentType, call),
+    stringify(contentType));
+
+  // Launch should fail because test containerizer doesn't support `attach`.
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::InternalServerError().status, 
response);
+
+  // Settle the clock here to ensure any pending callbacks are executed.
+  Clock::settle();
+
+  // Attach failure should result in the destruction of nested container.
+  containerIds = containerizer.containers();
+  AWAIT_READY(containerIds);
+  EXPECT_EQ(1u, containerIds->size());
+  EXPECT_FALSE(containerIds->contains(devolve(containerId)));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
 // TODO(vinod): Update the test when mesos containerizer
 // adds support for `attach`.
 TEST_P(AgentAPITest, AttachContainerOutputFailure)

Reply via email to