Repository: mesos Updated Branches: refs/heads/1.5.x 49e534954 -> 5ad9e052a
Introduced wrapper for access to cgroups system access. Different cgroups subsystems are modelled as actors. In this patch we introduce wrapper classes which `dispatch` to the processes. This removes e.g., races from mixing naked and `dispatch`'ed method calls. Review: https://reviews.apache.org/r/66635/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/30d21a95 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/30d21a95 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/30d21a95 Branch: refs/heads/1.5.x Commit: 30d21a95e44f5f2b3a961af06fb51293c92882e4 Parents: c44c299 Author: Benjamin Bannier <[email protected]> Authored: Fri May 11 00:54:32 2018 -0700 Committer: Gilbert Song <[email protected]> Committed: Fri May 11 00:59:35 2018 -0700 ---------------------------------------------------------------------- .../mesos/isolators/cgroups/cgroups.cpp | 46 ++----- .../mesos/isolators/cgroups/cgroups.hpp | 9 +- .../mesos/isolators/cgroups/subsystem.cpp | 126 ++++++++++++++++++- .../mesos/isolators/cgroups/subsystem.hpp | 79 ++++++++++-- 4 files changed, 207 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/30d21a95/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp index a03e0ad..6d663a5 100644 --- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp +++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp @@ -57,7 +57,7 @@ namespace slave { CgroupsIsolatorProcess::CgroupsIsolatorProcess( const Flags& _flags, const hashmap<string, string>& _hierarchies, - const multihashmap<string, Owned<SubsystemProcess>>& _subsystems) + const multihashmap<string, Owned<Subsystem>>& _subsystems) : ProcessBase(process::ID::generate("cgroups-isolator")), flags(_flags), hierarchies(_hierarchies), @@ -73,7 +73,7 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags) hashmap<string, string> hierarchies; // Hierarchy path -> subsystem object. - multihashmap<string, Owned<SubsystemProcess>> subsystems; + multihashmap<string, Owned<Subsystem>> subsystems; // Multimap: isolator name -> subsystem name. multihashmap<string, string> isolatorMap = { @@ -124,8 +124,8 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& flags) } // Create and load the subsystem. - Try<Owned<SubsystemProcess>> subsystem = - SubsystemProcess::create(flags, subsystemName, hierarchy.get()); + Try<Owned<Subsystem>> subsystem = + Subsystem::create(flags, subsystemName, hierarchy.get()); if (subsystem.isError()) { return Error( @@ -157,23 +157,6 @@ bool CgroupsIsolatorProcess::supportsStandalone() } -void CgroupsIsolatorProcess::initialize() -{ - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { - spawn(subsystem.get()); - } -} - - -void CgroupsIsolatorProcess::finalize() -{ - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { - terminate(subsystem.get()); - wait(subsystem.get()); - } -} - - Future<Nothing> CgroupsIsolatorProcess::recover( const list<ContainerState>& states, const hashset<ContainerID>& orphans) @@ -338,8 +321,7 @@ Future<Nothing> CgroupsIsolatorProcess::___recover( continue; } - foreach ( - const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) { + foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) { recoveredSubsystems.insert(subsystem->name()); recovers.push_back(subsystem->recover(containerId, cgroup)); } @@ -439,8 +421,7 @@ Future<Option<ContainerLaunchInfo>> CgroupsIsolatorProcess::prepare( "'" + path + "': " + create.error()); } - foreach ( - const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) { + foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) { infos[containerId]->subsystems.insert(subsystem->name()); prepares.push_back(subsystem->prepare( containerId, @@ -578,7 +559,7 @@ Future<Nothing> CgroupsIsolatorProcess::isolate( } list<Future<Nothing>> isolates; - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { isolates.push_back(subsystem->isolate( containerId, infos[containerId]->cgroup, @@ -629,7 +610,7 @@ Future<ContainerLimitation> CgroupsIsolatorProcess::watch( return Failure("Unknown container"); } - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { if (infos[containerId]->subsystems.contains(subsystem->name())) { subsystem->watch(containerId, infos[containerId]->cgroup) .onAny(defer( @@ -671,7 +652,7 @@ Future<Nothing> CgroupsIsolatorProcess::update( } list<Future<Nothing>> updates; - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { if (infos[containerId]->subsystems.contains(subsystem->name())) { updates.push_back(subsystem->update( containerId, @@ -722,7 +703,7 @@ Future<ResourceStatistics> CgroupsIsolatorProcess::usage( } list<Future<ResourceStatistics>> usages; - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { if (infos[containerId]->subsystems.contains(subsystem->name())) { usages.push_back(subsystem->usage( containerId, @@ -765,7 +746,7 @@ Future<ContainerStatus> CgroupsIsolatorProcess::status( } list<Future<ContainerStatus>> statuses; - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { if (infos[containerId]->subsystems.contains(subsystem->name())) { statuses.push_back(subsystem->status( containerId, @@ -808,7 +789,7 @@ Future<Nothing> CgroupsIsolatorProcess::cleanup( } list<Future<Nothing>> cleanups; - foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) { + foreachvalue (const Owned<Subsystem>& subsystem, subsystems) { if (infos[containerId]->subsystems.contains(subsystem->name())) { cleanups.push_back(subsystem->cleanup( containerId, @@ -850,8 +831,7 @@ Future<Nothing> CgroupsIsolatorProcess::_cleanup( // TODO(haosdent): Use foreachkey once MESOS-5037 is resolved. foreach (const string& hierarchy, subsystems.keys()) { - foreach ( - const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) { + foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) { if (infos[containerId]->subsystems.contains(subsystem->name())) { destroys.push_back(cgroups::destroy( hierarchy, http://git-wip-us.apache.org/repos/asf/mesos/blob/30d21a95/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp index 9dcd5da..f47b16e 100644 --- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp +++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp @@ -81,11 +81,6 @@ public: virtual process::Future<Nothing> cleanup( const ContainerID& containerId); -protected: - virtual void initialize(); - - virtual void finalize(); - private: struct Info { @@ -107,7 +102,7 @@ private: CgroupsIsolatorProcess( const Flags& _flags, const hashmap<std::string, std::string>& _hierarchies, - const multihashmap<std::string, process::Owned<SubsystemProcess>>& + const multihashmap<std::string, process::Owned<Subsystem>>& _subsystems); process::Future<Nothing> _recover( @@ -162,7 +157,7 @@ private: // /cgroup/memory -> memory // As we see, subsystem 'cpu' and 'cpuacct' are co-mounted at // '/cgroup/cpu,cpuacct'. - multihashmap<std::string, process::Owned<SubsystemProcess>> subsystems; + multihashmap<std::string, process::Owned<Subsystem>> subsystems; // Store cgroups associated information for containers. hashmap<ContainerID, process::Owned<Info>> infos; http://git-wip-us.apache.org/repos/asf/mesos/blob/30d21a95/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp index 1ba6ec4..dc6c7aa 100644 --- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp +++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <utility> + #include <stout/error.hpp> #include <stout/hashmap.hpp> @@ -43,7 +45,27 @@ namespace mesos { namespace internal { namespace slave { -Try<Owned<SubsystemProcess>> SubsystemProcess::create( +Subsystem::Subsystem(Owned<SubsystemProcess> _process) + : process(std::move(_process)) +{ + process::spawn(process.get()); +} + + +Subsystem::~Subsystem() +{ + process::terminate(process.get()); + process::wait(process.get()); +} + + +string Subsystem::name() const +{ + return process->name(); +} + + +Try<Owned<Subsystem>> Subsystem::create( const Flags& flags, const string& name, const string& hierarchy) @@ -76,7 +98,107 @@ Try<Owned<SubsystemProcess>> SubsystemProcess::create( subsystemProcess.error()); } - return subsystemProcess.get(); + return Owned<Subsystem>(new Subsystem(subsystemProcess.get())); +} + + +Future<Nothing> Subsystem::recover( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::recover, + containerId, + cgroup); +} + + +Future<Nothing> Subsystem::prepare( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::prepare, + containerId, + cgroup); +} + + +Future<Nothing> Subsystem::isolate( + const ContainerID& containerId, + const string& cgroup, + pid_t pid) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::isolate, + containerId, + cgroup, + pid); +} + + +Future<mesos::slave::ContainerLimitation> Subsystem::watch( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::watch, + containerId, + cgroup); +} + + +Future<Nothing> Subsystem::update( + const ContainerID& containerId, + const string& cgroup, + const Resources& resources) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::update, + containerId, + cgroup, + resources); +} + + +Future<ResourceStatistics> Subsystem::usage( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::usage, + containerId, + cgroup); +} + + +Future<ContainerStatus> Subsystem::status( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::status, + containerId, + cgroup); +} + + +Future<Nothing> Subsystem::cleanup( + const ContainerID& containerId, + const string& cgroup) +{ + return process::dispatch( + process.get(), + &SubsystemProcess::cleanup, + containerId, + cgroup); } http://git-wip-us.apache.org/repos/asf/mesos/blob/30d21a95/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp index c99a00c..5ae8253 100644 --- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp +++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp @@ -36,10 +36,12 @@ namespace mesos { namespace internal { namespace slave { +class SubsystemProcess; + /** * An abstraction for cgroups subsystem. */ -class SubsystemProcess : public process::Process<SubsystemProcess> +class Subsystem { public: /** @@ -51,19 +53,26 @@ public: * @param hierarchy The hierarchy path of cgroups subsystem. * @return A specific `Subsystem` object or an error if `create` fails. */ - static Try<process::Owned<SubsystemProcess>> create( + static Try<process::Owned<Subsystem>> create( const Flags& flags, const std::string& name, const std::string& hierarchy); - virtual ~SubsystemProcess() {} + // We have unique ownership of the wrapped process and + // enforce that objects of this class cannot be copied. + // + // TODO(bbannier): Remove this once MESOS-5122 is resolved. + Subsystem(const Subsystem&) = delete; + Subsystem& operator=(const Subsystem&) = delete; + + ~Subsystem(); /** * The cgroups subsystem name of this `Subsystem` object. * * @return The cgroups subsystem name. */ - virtual std::string name() const = 0; + std::string name() const; /** * Recover the cgroups subsystem for the associated container. @@ -72,7 +81,7 @@ public: * @param cgroup The target cgroup. * @return Nothing or an error if `recover` fails. */ - virtual process::Future<Nothing> recover( + process::Future<Nothing> recover( const ContainerID& containerId, const std::string& cgroup); @@ -83,7 +92,7 @@ public: * @param cgroup The target cgroup. * @return Nothing or an error if `prepare` fails. */ - virtual process::Future<Nothing> prepare( + process::Future<Nothing> prepare( const ContainerID& containerId, const std::string& cgroup); @@ -95,7 +104,7 @@ public: * @param pid The process id of container. * @return Nothing or an error if `isolate` fails. */ - virtual process::Future<Nothing> isolate( + process::Future<Nothing> isolate( const ContainerID& containerId, const std::string& cgroup, pid_t pid); @@ -108,7 +117,7 @@ public: * @return The resource limitation that impacts the container or an * error if `watch` fails. */ - virtual process::Future<mesos::slave::ContainerLimitation> watch( + process::Future<mesos::slave::ContainerLimitation> watch( const ContainerID& containerId, const std::string& cgroup); @@ -121,7 +130,7 @@ public: * @param resources The resources need to update. * @return Nothing or an error if `update` fails. */ - virtual process::Future<Nothing> update( + process::Future<Nothing> update( const ContainerID& containerId, const std::string& cgroup, const Resources& resources); @@ -135,7 +144,7 @@ public: * @return The resource usage statistics or an error if gather statistics * fails. */ - virtual process::Future<ResourceStatistics> usage( + process::Future<ResourceStatistics> usage( const ContainerID& containerId, const std::string& cgroup); @@ -147,7 +156,7 @@ public: * @param cgroup The target cgroup. * @return The container status or an error if get fails. */ - virtual process::Future<ContainerStatus> status( + process::Future<ContainerStatus> status( const ContainerID& containerId, const std::string& cgroup); @@ -163,6 +172,54 @@ public: * @param cgroup The target cgroup. * @return Nothing or an error if `cleanup` fails. */ + process::Future<Nothing> cleanup( + const ContainerID& containerId, + const std::string& cgroup); + +private: + explicit Subsystem(process::Owned<SubsystemProcess> process); + + process::Owned<SubsystemProcess> process; +}; + + +class SubsystemProcess : public process::Process<SubsystemProcess> +{ +public: + virtual ~SubsystemProcess() {} + + virtual std::string name() const = 0; + + virtual process::Future<Nothing> recover( + const ContainerID& containerId, + const std::string& cgroup); + + virtual process::Future<Nothing> prepare( + const ContainerID& containerId, + const std::string& cgroup); + + virtual process::Future<Nothing> isolate( + const ContainerID& containerId, + const std::string& cgroup, + pid_t pid); + + virtual process::Future<mesos::slave::ContainerLimitation> watch( + const ContainerID& containerId, + const std::string& cgroup); + + virtual process::Future<Nothing> update( + const ContainerID& containerId, + const std::string& cgroup, + const Resources& resources); + + virtual process::Future<ResourceStatistics> usage( + const ContainerID& containerId, + const std::string& cgroup); + + virtual process::Future<ContainerStatus> status( + const ContainerID& containerId, + const std::string& cgroup); + virtual process::Future<Nothing> cleanup( const ContainerID& containerId, const std::string& cgroup);
