Supported getting all containers in the agent API. Review: https://reviews.apache.org/r/64639
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7de21b1c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7de21b1c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7de21b1c Branch: refs/heads/master Commit: 7de21b1ccaf0da00eba42a4a4a687ba2ef6f07b4 Parents: a814362 Author: Jie Yu <[email protected]> Authored: Fri Dec 15 08:37:05 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Fri Dec 15 13:56:43 2017 -0800 ---------------------------------------------------------------------- src/internal/evolve.cpp | 18 +- src/slave/containerizer/mesos/containerizer.cpp | 4 - src/slave/http.cpp | 278 +++++++++++++------ src/slave/http.hpp | 5 +- src/tests/agent_container_api_tests.cpp | 48 ++++ 5 files changed, 258 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/internal/evolve.cpp ---------------------------------------------------------------------- diff --git a/src/internal/evolve.cpp b/src/internal/evolve.cpp index 6ce6150..7758c9b 100644 --- a/src/internal/evolve.cpp +++ b/src/internal/evolve.cpp @@ -685,19 +685,25 @@ v1::agent::Response evolve<v1::agent::Response::GET_CONTAINERS>( Result<JSON::String> framework_id = object.find<JSON::String>("framework_id"); - CHECK_SOME(framework_id); - container->mutable_framework_id()->set_value(framework_id.get().value); + CHECK(!framework_id.isError()); + if (framework_id.isSome()) { + container->mutable_framework_id()->set_value(framework_id.get().value); + } Result<JSON::String> executor_id = object.find<JSON::String>("executor_id"); - CHECK_SOME(executor_id); - container->mutable_executor_id()->set_value(executor_id.get().value); + CHECK(!executor_id.isError()); + if (executor_id.isSome()) { + container->mutable_executor_id()->set_value(executor_id.get().value); + } Result<JSON::String> executor_name = object.find<JSON::String>("executor_name"); - CHECK_SOME(executor_name); - container->set_executor_name(executor_name.get().value); + CHECK(!executor_name.isError()); + if (executor_name.isSome()) { + container->set_executor_name(executor_name.get().value); + } Result<JSON::Object> container_status = object.find<JSON::Object>("status"); if (container_status.isSome()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/containerizer/mesos/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp index 616873e..1a398a8 100644 --- a/src/slave/containerizer/mesos/containerizer.cpp +++ b/src/slave/containerizer/mesos/containerizer.cpp @@ -2194,8 +2194,6 @@ Future<ResourceStatistics> _usage( const Option<Resources>& resources, const list<Future<ResourceStatistics>>& statistics) { - CHECK(!containerId.has_parent()); - ResourceStatistics result; // Set the timestamp now we have all statistics. @@ -2232,8 +2230,6 @@ Future<ResourceStatistics> _usage( Future<ResourceStatistics> MesosContainerizerProcess::usage( const ContainerID& containerId) { - CHECK(!containerId.has_parent()); - if (!containers_.contains(containerId)) { return Failure("Unknown container " + stringify(containerId)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index ed22b9f..3e20c90 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -72,6 +72,8 @@ #include "slave/slave.hpp" #include "slave/validation.hpp" +#include "slave/containerizer/mesos/paths.hpp" + #include "version/version.hpp" using mesos::agent::ProcessIO; @@ -2166,10 +2168,26 @@ Future<Response> Http::getContainers( AuthorizationAcceptor::create( principal, slave->authorizer, authorization::VIEW_CONTAINER); - return authorizeContainer.then(defer(slave->self(), - [this](const Owned<AuthorizationAcceptor>& authorizeContainer) { - // Use an empty container ID filter. - return __containers(authorizeContainer, None()); + Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer = + AuthorizationAcceptor::create( + principal, slave->authorizer, authorization::VIEW_STANDALONE_CONTAINER); + + return collect(authorizeContainer, authorizeStandaloneContainer) + .then(defer( + slave->self(), + [this, call](const tuple<Owned<AuthorizationAcceptor>, + Owned<AuthorizationAcceptor>>& acceptors) { + Owned<AuthorizationAcceptor> authorizeContainer; + Owned<AuthorizationAcceptor> authorizeStandaloneContainer; + tie(authorizeContainer, authorizeStandaloneContainer) = acceptors; + + // Use an empty container ID filter. + return __containers( + authorizeContainer, + authorizeStandaloneContainer, + None(), + call.get_containers().show_nested(), + call.get_containers().show_standalone()); })).then([acceptType](const Future<JSON::Array>& result) -> Future<Response> { if (!result.isReady()) { @@ -2198,19 +2216,36 @@ Future<Response> Http::_containers( Future<Owned<AuthorizationAcceptor>> authorizeContainer = AuthorizationAcceptor::create( principal, slave->authorizer, authorization::VIEW_CONTAINER); + + Future<Owned<AuthorizationAcceptor>> authorizeStandaloneContainer = + AuthorizationAcceptor::create( + principal, slave->authorizer, authorization::VIEW_STANDALONE_CONTAINER); + Future<IDAcceptor<ContainerID>> selectContainerId = IDAcceptor<ContainerID>(request.url.query.get("container_id")); - return collect(authorizeContainer, selectContainerId) + return collect(authorizeContainer, + authorizeStandaloneContainer, + selectContainerId) .then(defer( slave->self(), [this](const tuple<Owned<AuthorizationAcceptor>, + Owned<AuthorizationAcceptor>, IDAcceptor<ContainerID>>& acceptors) { Owned<AuthorizationAcceptor> authorizeContainer; + Owned<AuthorizationAcceptor> authorizeStandaloneContainer; Option<IDAcceptor<ContainerID>> selectContainerId; - tie(authorizeContainer, selectContainerId) = acceptors; - return __containers(authorizeContainer, selectContainerId); + tie(authorizeContainer, + authorizeStandaloneContainer, + selectContainerId) = acceptors; + + return __containers( + authorizeContainer, + authorizeStandaloneContainer, + selectContainerId, + false, + false); })).then([request](const Future<JSON::Array>& result) -> Future<Response> { if (!result.isReady()) { LOG(WARNING) << "Could not collect container status and statistics: " @@ -2231,96 +2266,171 @@ Future<Response> Http::_containers( Future<JSON::Array> Http::__containers( Owned<AuthorizationAcceptor> authorizeContainer, - Option<IDAcceptor<ContainerID>> selectContainerId) const + Owned<AuthorizationAcceptor> authorizeStandaloneContainer, + Option<IDAcceptor<ContainerID>> selectContainerId, + bool showNestedContainers, + bool showStandaloneContainers) const { - Owned<list<JSON::Object>> metadata(new list<JSON::Object>()); - list<Future<ContainerStatus>> statusFutures; - list<Future<ResourceStatistics>> statsFutures; + return slave->containerizer->containers() + .then(defer(slave->self(), [=](const hashset<ContainerID> containerIds) { + Owned<list<JSON::Object>> metadata(new list<JSON::Object>()); + list<Future<ContainerStatus>> statusFutures; + list<Future<ResourceStatistics>> statsFutures; + + hashset<ContainerID> executorContainerIds; + hashset<ContainerID> authorizedExecutorContainerIds; + + foreachvalue (const Framework* framework, slave->frameworks) { + foreachvalue (const Executor* executor, framework->executors) { + // No need to get statistics and status if we know that the + // executor has already terminated. + if (executor->state == Executor::TERMINATED) { + continue; + } - foreachvalue (const Framework* framework, slave->frameworks) { - foreachvalue (const Executor* executor, framework->executors) { - // No need to get statistics and status if we know that the - // executor has already terminated. - if (executor->state == Executor::TERMINATED) { - continue; - } + const ExecutorInfo& info = executor->info; + const ContainerID& containerId = executor->containerId; - const ExecutorInfo& info = executor->info; - const ContainerID& containerId = executor->containerId; + executorContainerIds.insert(containerId); - if ((selectContainerId.isSome() && - !selectContainerId->accept(containerId)) || - !authorizeContainer->accept(info, framework->info)) { - continue; + if ((selectContainerId.isSome() && + !selectContainerId->accept(containerId)) || + !authorizeContainer->accept(info, framework->info)) { + continue; + } + + authorizedExecutorContainerIds.insert(containerId); + + JSON::Object entry; + entry.values["framework_id"] = info.framework_id().value(); + entry.values["executor_id"] = info.executor_id().value(); + entry.values["executor_name"] = info.name(); + entry.values["source"] = info.source(); + entry.values["container_id"] = containerId.value(); + + metadata->push_back(entry); + statusFutures.push_back(slave->containerizer->status(containerId)); + statsFutures.push_back(slave->containerizer->usage(containerId)); + } } - JSON::Object entry; - entry.values["framework_id"] = info.framework_id().value(); - entry.values["executor_id"] = info.executor_id().value(); - entry.values["executor_name"] = info.name(); - entry.values["source"] = info.source(); - entry.values["container_id"] = containerId.value(); + foreach (const ContainerID& containerId, containerIds) { + if (executorContainerIds.contains(containerId)) { + continue; + } - metadata->push_back(entry); - statusFutures.push_back(slave->containerizer->status(containerId)); - statsFutures.push_back(slave->containerizer->usage(containerId)); - } - } + if (selectContainerId.isSome() && + !selectContainerId->accept(containerId)) { + continue; + } - return await(await(statusFutures), await(statsFutures)).then( - [metadata](const tuple< - Future<list<Future<ContainerStatus>>>, - Future<list<Future<ResourceStatistics>>>>& t) - -> Future<JSON::Array> { - const list<Future<ContainerStatus>>& status = std::get<0>(t).get(); - const list<Future<ResourceStatistics>>& stats = std::get<1>(t).get(); - CHECK_EQ(status.size(), stats.size()); - CHECK_EQ(status.size(), metadata->size()); - - JSON::Array result; - - auto statusIter = status.begin(); - auto statsIter = stats.begin(); - auto metadataIter = metadata->begin(); - - while (statusIter != status.end() && - statsIter != stats.end() && - metadataIter != metadata->end()) { - JSON::Object& entry = *metadataIter; - - if (statusIter->isReady()) { - entry.values["status"] = JSON::protobuf(statusIter->get()); - } else { - LOG(WARNING) << "Failed to get container status for executor '" - << entry.values["executor_id"] << "'" - << " of framework " - << entry.values["framework_id"] << ": " - << (statusIter->isFailed() - ? statusIter->failure() - : "discarded"); - } + const bool isNestedContainer = containerId.has_parent(); - if (statsIter->isReady()) { - entry.values["statistics"] = JSON::protobuf(statsIter->get()); - } else { - LOG(WARNING) << "Failed to get resource statistics for executor '" - << entry.values["executor_id"] << "'" - << " of framework " - << entry.values["framework_id"] << ": " - << (statsIter->isFailed() - ? statsIter->failure() - : "discarded"); - } + // TODO(jieyu): Only MesosContainerizer supports standalone + // container currently. Thus it's ok to call + // MesosContainerizer-specific method here. If we want to + // support other Containerizers, we should make this a + // Containerizer interface. + const bool isStandaloneContainer = + containerizer::paths::isStandaloneContainer( + slave->flags.runtime_dir, + containerId); + + // For nested containers, authorization is always based on + // its root container. + ContainerID rootContainerId = protobuf::getRootContainerId(containerId); - result.values.push_back(entry); + const bool isRootContainerStandalone = + containerizer::paths::isStandaloneContainer( + slave->flags.runtime_dir, + rootContainerId); - statusIter++; - statsIter++; - metadataIter++; + if (isNestedContainer && !showNestedContainers) { + continue; } - return result; - }); + if (isStandaloneContainer && !showStandaloneContainers) { + continue; + } + + if (isRootContainerStandalone && + !authorizeStandaloneContainer->accept()) { + continue; + } + + if (!isRootContainerStandalone && + !authorizedExecutorContainerIds.contains(rootContainerId)) { + continue; + } + + JSON::Object entry; + entry.values["container_id"] = containerId.value(); + + metadata->push_back(entry); + statusFutures.push_back(slave->containerizer->status(containerId)); + statsFutures.push_back(slave->containerizer->usage(containerId)); + } + + return await(await(statusFutures), await(statsFutures)).then( + [metadata](const tuple< + Future<list<Future<ContainerStatus>>>, + Future<list<Future<ResourceStatistics>>>>& t) + -> Future<JSON::Array> { + const list<Future<ContainerStatus>>& status = + std::get<0>(t).get(); + + const list<Future<ResourceStatistics>>& stats = + std::get<1>(t).get(); + + CHECK_EQ(status.size(), stats.size()); + CHECK_EQ(status.size(), metadata->size()); + + JSON::Array result; + + auto statusIter = status.begin(); + auto statsIter = stats.begin(); + auto metadataIter = metadata->begin(); + + while (statusIter != status.end() && + statsIter != stats.end() && + metadataIter != metadata->end()) { + JSON::Object& entry = *metadataIter; + + if (statusIter->isReady()) { + entry.values["status"] = JSON::protobuf(statusIter->get()); + } else { + LOG(WARNING) << "Failed to get container status for executor '" + << entry.values["executor_id"] << "'" + << " of framework " + << entry.values["framework_id"] << ": " + << (statusIter->isFailed() + ? statusIter->failure() + : "discarded"); + } + + if (statsIter->isReady()) { + entry.values["statistics"] = JSON::protobuf(statsIter->get()); + } else { + LOG(WARNING) + << "Failed to get resource statistics for executor '" + << entry.values["executor_id"] << "'" + << " of framework " + << entry.values["framework_id"] << ": " + << (statsIter->isFailed() + ? statsIter->failure() + : "discarded"); + } + + result.values.push_back(entry); + + statusIter++; + statsIter++; + metadataIter++; + } + + return result; + }); + })); } http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/slave/http.hpp ---------------------------------------------------------------------- diff --git a/src/slave/http.hpp b/src/slave/http.hpp index 9ca0617..3cdbf98 100644 --- a/src/slave/http.hpp +++ b/src/slave/http.hpp @@ -117,7 +117,10 @@ private: // Helper function to collect containers status and resource statistics. process::Future<JSON::Array> __containers( process::Owned<AuthorizationAcceptor> authorizeContainer, - Option<IDAcceptor<ContainerID>> selectContainerId) const; + process::Owned<AuthorizationAcceptor> authorizeStandaloneContainer, + Option<IDAcceptor<ContainerID>> selectContainerId, + bool showNestedContainers, + bool showStandaloneContainers) const; // Helper routines for endpoint authorization. Try<std::string> extractEndpoint(const process::http::URL& url) const; http://git-wip-us.apache.org/repos/asf/mesos/blob/7de21b1c/src/tests/agent_container_api_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/agent_container_api_tests.cpp b/src/tests/agent_container_api_tests.cpp index c2c15eb..6185692 100644 --- a/src/tests/agent_container_api_tests.cpp +++ b/src/tests/agent_container_api_tests.cpp @@ -421,6 +421,17 @@ public: return post(slave, call); } + Future<http::Response> getContainers( + const process::PID<slave::Slave>& slave) + { + v1::agent::Call call; + call.set_type(v1::agent::Call::GET_CONTAINERS); + call.mutable_get_containers()->set_show_nested(true); + call.mutable_get_containers()->set_show_standalone(true); + + return post(slave, call); + } + protected: virtual void TearDown() { @@ -836,6 +847,43 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS( EXPECT_TRUE(checkWaitContainerResponse(wait, SIGKILL)); } + +// This test verifies the GET_CONTAINERS API call. +TEST_P_TEMP_DISABLED_ON_WINDOWS(AgentContainerAPITest, GetContainers) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.launcher = std::get<1>(std::get<3>(GetParam())); + slaveFlags.isolation = std::get<0>(std::get<3>(GetParam())); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + Try<v1::ContainerID> parentContainerId = + launchParentContainer(master.get()->pid, slave.get()->pid); + + ASSERT_SOME(parentContainerId); + + // Launch a nested container and wait for it to finish. + v1::ContainerID containerId; + containerId.set_value(id::UUID::random().toString()); + containerId.mutable_parent()->CopyFrom(parentContainerId.get()); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + launchNestedContainer(slave.get()->pid, containerId)); + + Future<v1::agent::Response> response = + deserialize(getContainers(slave.get()->pid)); + + ASSERT_EQ(v1::agent::Response::GET_CONTAINERS, response->type()); + EXPECT_EQ(2, response->get_containers().containers_size()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
