Implemented the *_NESTED_CONTAINER calls in the agent API. This patch adds the wiring for the *_NESTED_CONTAINER calls, including validation and calling into the containerizer.
Review: https://reviews.apache.org/r/52135 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5e23edd5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5e23edd5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5e23edd5 Branch: refs/heads/master Commit: 5e23edd513caec51ce3e94b3d785d714052525e8 Parents: 60ee731 Author: Benjamin Mahler <bmah...@apache.org> Authored: Tue Sep 20 12:20:58 2016 -0700 Committer: Benjamin Mahler <bmah...@apache.org> Committed: Thu Sep 22 12:44:14 2016 -0700 ---------------------------------------------------------------------- src/slave/http.cpp | 83 +++++++++++++++- src/tests/api_tests.cpp | 220 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 300 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5e23edd5/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 73135be..349d307 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -66,6 +66,8 @@ #include "version/version.hpp" +using mesos::slave::ContainerTermination; + using process::AUTHENTICATION; using process::AUTHORIZATION; using process::Clock; @@ -1909,7 +1911,38 @@ Future<Response> Slave::Http::launchNestedContainer( const Option<string>& principal, ContentType contentType) const { - return Failure("Unimplemented"); + CHECK_EQ(mesos::agent::Call::LAUNCH_NESTED_CONTAINER, call.type()); + CHECK(call.has_launch_nested_container()); + + const ContainerID& containerId = + call.launch_nested_container().container_id(); + + Future<Nothing> launched = slave->containerizer->launch( + containerId, + call.launch_nested_container().command(), + call.launch_nested_container().has_container() + ? call.launch_nested_container().container() + : Option<ContainerInfo>::none(), + call.launch_nested_container().resources()); + + // TODO(bmahler): The containerizers currently require that + // the caller calls destroy if the launch fails. See MESOS-6214. + launched + .onFailed(defer(slave->self(), [=](const string& failure) { + LOG(WARNING) << "Failed to launch nested container " << containerId + << ": " << failure; + + slave->containerizer->destroy(containerId) + .onFailed([=](const string& failure) { + LOG(ERROR) << "Failed to destroy neseted container " << containerId + << " after launch failure: " << failure; + }); + })); + + return launched + .then([]() -> Response { + return OK(); + }); } @@ -1918,7 +1951,36 @@ Future<Response> Slave::Http::waitNestedContainer( const Option<string>& principal, ContentType contentType) const { - return Failure("Unimplemented"); + CHECK_EQ(mesos::agent::Call::WAIT_NESTED_CONTAINER, call.type()); + CHECK(call.has_wait_nested_container()); + + const ContainerID& containerId = + call.wait_nested_container().container_id(); + + Future<Option<mesos::slave::ContainerTermination>> wait = + slave->containerizer->wait(containerId); + + return wait + .then([containerId, contentType]( + const Option<ContainerTermination>& termination) -> Response { + if (termination.isNone()) { + return NotFound("Container " + stringify(containerId) + + " cannot be found"); + } + + mesos::agent::Response response; + response.set_type(mesos::agent::Response::WAIT_NESTED_CONTAINER); + + mesos::agent::Response::WaitNestedContainer* waitNestedContainer = + response.mutable_wait_nested_container(); + + if (termination->has_status()) { + waitNestedContainer->set_exit_status(termination->status()); + } + + return OK(serialize(contentType, evolve(response)), + stringify(contentType)); + }); } @@ -1927,7 +1989,22 @@ Future<Response> Slave::Http::killNestedContainer( const Option<string>& principal, ContentType contentType) const { - return Failure("Unimplemented"); + CHECK_EQ(mesos::agent::Call::KILL_NESTED_CONTAINER, call.type()); + CHECK(call.has_kill_nested_container()); + + const ContainerID& containerId = + call.kill_nested_container().container_id(); + + Future<bool> destroy = slave->containerizer->destroy(containerId); + + return destroy + .then([containerId](bool found) -> Response { + if (!found) { + return NotFound("Container '" + stringify(containerId) + "'" + " cannot be found (or is already killed)"); + } + return OK(); + }); } } // namespace slave { http://git-wip-us.apache.org/repos/asf/mesos/blob/5e23edd5/src/tests/api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index 26f99f7..e857b17 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -50,11 +50,15 @@ #include "tests/containerizer.hpp" #include "tests/mesos.hpp" +#include "tests/containerizer/mock_containerizer.hpp" + using google::protobuf::RepeatedPtrField; using mesos::master::detector::MasterDetector; using mesos::master::detector::StandaloneMasterDetector; +using mesos::slave::ContainerTermination; + using mesos::internal::recordio::Reader; using mesos::internal::slave::Slave; @@ -67,6 +71,7 @@ using process::Clock; using process::Failure; using process::Future; using process::Owned; +using process::Promise; using process::http::Accepted; using process::http::NotFound; @@ -3203,6 +3208,221 @@ TEST_P(AgentAPITest, GetState) } } + +TEST_P(AgentAPITest, NestedContainerWaitNotFound) +{ + ContentType contentType = GetParam(); + + Clock::pause(); + + StandaloneMasterDetector detector; + MockContainerizer mockContainerizer; + + EXPECT_CALL(mockContainerizer, recover(_)) + .WillOnce(Return(Future<Nothing>(Nothing()))); + + Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); + + Try<Owned<cluster::Slave>> slave = + StartSlave(&detector, &mockContainerizer); + + ASSERT_SOME(slave); + + // Wait for the agent to finish recovery. + AWAIT_READY(__recover); + Clock::settle(); + + // Expect a 404 for waiting on unknown containers. + { + EXPECT_CALL(mockContainerizer, wait(_)) + .WillOnce(Return(Future<Option<ContainerTermination>>(None()))); + + v1::agent::Call call; + call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER); + + v1::ContainerID unknownContainerId; + unknownContainerId.set_value(UUID::random().toString()); + + call.mutable_wait_nested_container()->mutable_container_id() + ->CopyFrom(unknownContainerId); + + Future<Response> response = process::http::post( + slave.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotFound().status, response); + } + + // The destructor of `cluster::Slave` will try to clean up any + // remaining containers by inspecting the result of `containers()`. + EXPECT_CALL(mockContainerizer, containers()) + .WillRepeatedly(Return(hashset<ContainerID>())); +} + + +TEST_P(AgentAPITest, NestedContainerKillNotFound) +{ + ContentType contentType = GetParam(); + + Clock::pause(); + + StandaloneMasterDetector detector; + MockContainerizer mockContainerizer; + + EXPECT_CALL(mockContainerizer, recover(_)) + .WillOnce(Return(Future<Nothing>(Nothing()))); + + Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); + + Try<Owned<cluster::Slave>> slave = + StartSlave(&detector, &mockContainerizer); + + ASSERT_SOME(slave); + + // Wait for the agent to finish recovery. + AWAIT_READY(__recover); + Clock::settle(); + + // Expect a 404 for killing unknown containers. + { + EXPECT_CALL(mockContainerizer, destroy(_)) + .WillOnce(Return(Future<bool>(false))); + + v1::agent::Call call; + call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER); + + v1::ContainerID unknownContainerId; + unknownContainerId.set_value(UUID::random().toString()); + + call.mutable_kill_nested_container()->mutable_container_id() + ->CopyFrom(unknownContainerId); + + Future<Response> response = process::http::post( + slave.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(NotFound().status, response); + } + + // The destructor of `cluster::Slave` will try to clean up any + // remaining containers by inspecting the result of `containers()`. + EXPECT_CALL(mockContainerizer, containers()) + .WillRepeatedly(Return(hashset<ContainerID>())); +} + + +TEST_P(AgentAPITest, NestedContainerLaunch) +{ + ContentType contentType = GetParam(); + + Clock::pause(); + + StandaloneMasterDetector detector; + MockContainerizer mockContainerizer; + + EXPECT_CALL(mockContainerizer, recover(_)) + .WillOnce(Return(Future<Nothing>(Nothing()))); + + Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); + + Try<Owned<cluster::Slave>> slave = + StartSlave(&detector, &mockContainerizer); + + ASSERT_SOME(slave); + + // Wait for the agent to finish recovery. + AWAIT_READY(__recover); + Clock::settle(); + + // Launch a nested container and wait for it to finish. + v1::ContainerID containerId; + containerId.set_value(UUID::random().toString()); + + { + EXPECT_CALL(mockContainerizer, launch(_, _, _, _)) + .WillOnce(Return(Future<Nothing>(Nothing()))); + + v1::agent::Call call; + call.set_type(v1::agent::Call::LAUNCH_NESTED_CONTAINER); + + call.mutable_launch_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + call.mutable_launch_nested_container()->mutable_container_id() + ->mutable_parent()->set_value(UUID::random().toString()); + + Future<Response> response = process::http::post( + slave.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + } + + Promise<Option<ContainerTermination>> waitPromise; + Future<v1::agent::Response> wait; + + { + EXPECT_CALL(mockContainerizer, wait(_)) + .WillOnce(Return(waitPromise.future())); + + v1::agent::Call call; + call.set_type(v1::agent::Call::WAIT_NESTED_CONTAINER); + + call.mutable_wait_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + wait = post(slave.get()->pid, call, contentType); + + Clock::settle(); + + EXPECT_TRUE(wait.isPending()); + } + + // Now kill the nested container. + { + EXPECT_CALL(mockContainerizer, destroy(_)) + .WillOnce(Return(Future<bool>(true))); + + v1::agent::Call call; + call.set_type(v1::agent::Call::KILL_NESTED_CONTAINER); + + call.mutable_kill_nested_container()->mutable_container_id() + ->CopyFrom(containerId); + + Future<Response> response = process::http::post( + slave.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(contentType, call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + } + + ContainerTermination termination; + termination.set_status(1); + + waitPromise.set(Option<ContainerTermination>::some(termination)); + + AWAIT_READY(wait); + ASSERT_EQ(v1::agent::Response::WAIT_NESTED_CONTAINER, wait->type()); + EXPECT_EQ(1, wait->wait_nested_container().exit_status()); + + // The destructor of `cluster::Slave` will try to clean up any + // remaining containers by inspecting the result of `containers()`. + EXPECT_CALL(mockContainerizer, containers()) + .WillRepeatedly(Return(hashset<ContainerID>())); +} + } // namespace tests { } // namespace internal { } // namespace mesos {