Introduced wrapper for access to cgroups system access.

Different cgroups subsystems are modelled as actors. In this patch we
introduce wrapper classes which `dispatch` to the processes. This
removes e.g., races from mixing naked and `dispatch`'ed method calls.

Review: https://reviews.apache.org/r/66635/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6f50af2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6f50af2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6f50af2f

Branch: refs/heads/1.4.x
Commit: 6f50af2f7fadc6817098d5c1a50d8b8a950f0432
Parents: 43167ed
Author: Benjamin Bannier <[email protected]>
Authored: Fri May 11 00:54:32 2018 -0700
Committer: Gilbert Song <[email protected]>
Committed: Fri May 11 01:04:29 2018 -0700

----------------------------------------------------------------------
 .../mesos/isolators/cgroups/cgroups.cpp         |  46 ++-----
 .../mesos/isolators/cgroups/cgroups.hpp         |   9 +-
 .../mesos/isolators/cgroups/subsystem.cpp       | 126 ++++++++++++++++++-
 .../mesos/isolators/cgroups/subsystem.hpp       |  79 ++++++++++--
 4 files changed, 207 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6f50af2f/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp 
b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
index ec0b4a6..54b5c15 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.cpp
@@ -57,7 +57,7 @@ namespace slave {
 CgroupsIsolatorProcess::CgroupsIsolatorProcess(
     const Flags& _flags,
     const hashmap<string, string>& _hierarchies,
-    const multihashmap<string, Owned<SubsystemProcess>>& _subsystems)
+    const multihashmap<string, Owned<Subsystem>>& _subsystems)
   : ProcessBase(process::ID::generate("cgroups-isolator")),
     flags(_flags),
     hierarchies(_hierarchies),
@@ -73,7 +73,7 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& 
flags)
   hashmap<string, string> hierarchies;
 
   // Hierarchy path -> subsystem object.
-  multihashmap<string, Owned<SubsystemProcess>> subsystems;
+  multihashmap<string, Owned<Subsystem>> subsystems;
 
   // Multimap: isolator name -> subsystem name.
   multihashmap<string, string> isolatorMap = {
@@ -124,8 +124,8 @@ Try<Isolator*> CgroupsIsolatorProcess::create(const Flags& 
flags)
       }
 
       // Create and load the subsystem.
-      Try<Owned<SubsystemProcess>> subsystem =
-        SubsystemProcess::create(flags, subsystemName, hierarchy.get());
+      Try<Owned<Subsystem>> subsystem =
+        Subsystem::create(flags, subsystemName, hierarchy.get());
 
       if (subsystem.isError()) {
         return Error(
@@ -151,23 +151,6 @@ bool CgroupsIsolatorProcess::supportsNesting()
 }
 
 
-void CgroupsIsolatorProcess::initialize()
-{
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
-    spawn(subsystem.get());
-  }
-}
-
-
-void CgroupsIsolatorProcess::finalize()
-{
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
-    terminate(subsystem.get());
-    wait(subsystem.get());
-  }
-}
-
-
 Future<Nothing> CgroupsIsolatorProcess::recover(
     const list<ContainerState>& states,
     const hashset<ContainerID>& orphans)
@@ -332,8 +315,7 @@ Future<Nothing> CgroupsIsolatorProcess::___recover(
       continue;
     }
 
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       recoveredSubsystems.insert(subsystem->name());
       recovers.push_back(subsystem->recover(containerId, cgroup));
     }
@@ -433,8 +415,7 @@ Future<Option<ContainerLaunchInfo>> 
CgroupsIsolatorProcess::prepare(
           "'" + path + "': " + create.error());
     }
 
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       infos[containerId]->subsystems.insert(subsystem->name());
       prepares.push_back(subsystem->prepare(
           containerId,
@@ -575,7 +556,7 @@ Future<Nothing> CgroupsIsolatorProcess::isolate(
   }
 
   list<Future<Nothing>> isolates;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     isolates.push_back(subsystem->isolate(
         containerId,
         infos[containerId]->cgroup,
@@ -626,7 +607,7 @@ Future<ContainerLimitation> CgroupsIsolatorProcess::watch(
     return Failure("Unknown container");
   }
 
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       subsystem->watch(containerId, infos[containerId]->cgroup)
         .onAny(defer(
@@ -668,7 +649,7 @@ Future<Nothing> CgroupsIsolatorProcess::update(
   }
 
   list<Future<Nothing>> updates;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       updates.push_back(subsystem->update(
           containerId,
@@ -719,7 +700,7 @@ Future<ResourceStatistics> CgroupsIsolatorProcess::usage(
   }
 
   list<Future<ResourceStatistics>> usages;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       usages.push_back(subsystem->usage(
           containerId,
@@ -762,7 +743,7 @@ Future<ContainerStatus> CgroupsIsolatorProcess::status(
   }
 
   list<Future<ContainerStatus>> statuses;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       statuses.push_back(subsystem->status(
           containerId,
@@ -805,7 +786,7 @@ Future<Nothing> CgroupsIsolatorProcess::cleanup(
   }
 
   list<Future<Nothing>> cleanups;
-  foreachvalue (const Owned<SubsystemProcess>& subsystem, subsystems) {
+  foreachvalue (const Owned<Subsystem>& subsystem, subsystems) {
     if (infos[containerId]->subsystems.contains(subsystem->name())) {
       cleanups.push_back(subsystem->cleanup(
           containerId,
@@ -847,8 +828,7 @@ Future<Nothing> CgroupsIsolatorProcess::_cleanup(
 
   // TODO(haosdent): Use foreachkey once MESOS-5037 is resolved.
   foreach (const string& hierarchy, subsystems.keys()) {
-    foreach (
-        const Owned<SubsystemProcess>& subsystem, subsystems.get(hierarchy)) {
+    foreach (const Owned<Subsystem>& subsystem, subsystems.get(hierarchy)) {
       if (infos[containerId]->subsystems.contains(subsystem->name())) {
         destroys.push_back(cgroups::destroy(
             hierarchy,

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f50af2f/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp 
b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
index d846dde..3245a5f 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/cgroups.hpp
@@ -80,11 +80,6 @@ public:
   virtual process::Future<Nothing> cleanup(
       const ContainerID& containerId);
 
-protected:
-  virtual void initialize();
-
-  virtual void finalize();
-
 private:
   struct Info
   {
@@ -106,7 +101,7 @@ private:
   CgroupsIsolatorProcess(
       const Flags& _flags,
       const hashmap<std::string, std::string>& _hierarchies,
-      const multihashmap<std::string, process::Owned<SubsystemProcess>>&
+      const multihashmap<std::string, process::Owned<Subsystem>>&
         _subsystems);
 
   process::Future<Nothing> _recover(
@@ -161,7 +156,7 @@ private:
   //   /cgroup/memory      -> memory
   // As we see, subsystem 'cpu' and 'cpuacct' are co-mounted at
   // '/cgroup/cpu,cpuacct'.
-  multihashmap<std::string, process::Owned<SubsystemProcess>> subsystems;
+  multihashmap<std::string, process::Owned<Subsystem>> subsystems;
 
   // Store cgroups associated information for containers.
   hashmap<ContainerID, process::Owned<Info>> infos;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f50af2f/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp 
b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
index 1ba6ec4..dc6c7aa 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.cpp
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <utility>
+
 #include <stout/error.hpp>
 #include <stout/hashmap.hpp>
 
@@ -43,7 +45,27 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
-Try<Owned<SubsystemProcess>> SubsystemProcess::create(
+Subsystem::Subsystem(Owned<SubsystemProcess> _process)
+  : process(std::move(_process))
+{
+  process::spawn(process.get());
+}
+
+
+Subsystem::~Subsystem()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+string Subsystem::name() const
+{
+  return process->name();
+}
+
+
+Try<Owned<Subsystem>> Subsystem::create(
     const Flags& flags,
     const string& name,
     const string& hierarchy)
@@ -76,7 +98,107 @@ Try<Owned<SubsystemProcess>> SubsystemProcess::create(
         subsystemProcess.error());
   }
 
-  return subsystemProcess.get();
+  return Owned<Subsystem>(new Subsystem(subsystemProcess.get()));
+}
+
+
+Future<Nothing> Subsystem::recover(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::recover,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::prepare(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::prepare,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::isolate(
+    const ContainerID& containerId,
+    const string& cgroup,
+    pid_t pid)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::isolate,
+      containerId,
+      cgroup,
+      pid);
+}
+
+
+Future<mesos::slave::ContainerLimitation> Subsystem::watch(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::watch,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::update(
+    const ContainerID& containerId,
+    const string& cgroup,
+    const Resources& resources)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::update,
+      containerId,
+      cgroup,
+      resources);
+}
+
+
+Future<ResourceStatistics> Subsystem::usage(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::usage,
+      containerId,
+      cgroup);
+}
+
+
+Future<ContainerStatus> Subsystem::status(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::status,
+      containerId,
+      cgroup);
+}
+
+
+Future<Nothing> Subsystem::cleanup(
+    const ContainerID& containerId,
+    const string& cgroup)
+{
+  return process::dispatch(
+      process.get(),
+      &SubsystemProcess::cleanup,
+      containerId,
+      cgroup);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f50af2f/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp 
b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
index c99a00c..5ae8253 100644
--- a/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
+++ b/src/slave/containerizer/mesos/isolators/cgroups/subsystem.hpp
@@ -36,10 +36,12 @@ namespace mesos {
 namespace internal {
 namespace slave {
 
+class SubsystemProcess;
+
 /**
  * An abstraction for cgroups subsystem.
  */
-class SubsystemProcess : public process::Process<SubsystemProcess>
+class Subsystem
 {
 public:
   /**
@@ -51,19 +53,26 @@ public:
    * @param hierarchy The hierarchy path of cgroups subsystem.
    * @return A specific `Subsystem` object or an error if `create` fails.
    */
-  static Try<process::Owned<SubsystemProcess>> create(
+  static Try<process::Owned<Subsystem>> create(
       const Flags& flags,
       const std::string& name,
       const std::string& hierarchy);
 
-  virtual ~SubsystemProcess() {}
+  // We have unique ownership of the wrapped process and
+  // enforce that objects of this class cannot be copied.
+  //
+  // TODO(bbannier): Remove this once MESOS-5122 is resolved.
+  Subsystem(const Subsystem&) = delete;
+  Subsystem& operator=(const Subsystem&) = delete;
+
+  ~Subsystem();
 
   /**
    * The cgroups subsystem name of this `Subsystem` object.
    *
    * @return The cgroups subsystem name.
    */
-  virtual std::string name() const = 0;
+  std::string name() const;
 
   /**
    * Recover the cgroups subsystem for the associated container.
@@ -72,7 +81,7 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `recover` fails.
    */
-  virtual process::Future<Nothing> recover(
+  process::Future<Nothing> recover(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -83,7 +92,7 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `prepare` fails.
    */
-  virtual process::Future<Nothing> prepare(
+  process::Future<Nothing> prepare(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -95,7 +104,7 @@ public:
    * @param pid The process id of container.
    * @return Nothing or an error if `isolate` fails.
    */
-  virtual process::Future<Nothing> isolate(
+  process::Future<Nothing> isolate(
       const ContainerID& containerId,
       const std::string& cgroup,
       pid_t pid);
@@ -108,7 +117,7 @@ public:
    * @return The resource limitation that impacts the container or an
    *     error if `watch` fails.
    */
-  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+  process::Future<mesos::slave::ContainerLimitation> watch(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -121,7 +130,7 @@ public:
    * @param resources The resources need to update.
    * @return Nothing or an error if `update` fails.
    */
-  virtual process::Future<Nothing> update(
+  process::Future<Nothing> update(
       const ContainerID& containerId,
       const std::string& cgroup,
       const Resources& resources);
@@ -135,7 +144,7 @@ public:
    * @return The resource usage statistics or an error if gather statistics
    *     fails.
    */
-  virtual process::Future<ResourceStatistics> usage(
+  process::Future<ResourceStatistics> usage(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -147,7 +156,7 @@ public:
    * @param cgroup The target cgroup.
    * @return The container status or an error if get fails.
    */
-  virtual process::Future<ContainerStatus> status(
+  process::Future<ContainerStatus> status(
       const ContainerID& containerId,
       const std::string& cgroup);
 
@@ -163,6 +172,54 @@ public:
    * @param cgroup The target cgroup.
    * @return Nothing or an error if `cleanup` fails.
    */
+  process::Future<Nothing> cleanup(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+private:
+  explicit Subsystem(process::Owned<SubsystemProcess> process);
+
+  process::Owned<SubsystemProcess> process;
+};
+
+
+class SubsystemProcess : public process::Process<SubsystemProcess>
+{
+public:
+  virtual ~SubsystemProcess() {}
+
+  virtual std::string name() const = 0;
+
+  virtual process::Future<Nothing> recover(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> prepare(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> isolate(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      pid_t pid);
+
+  virtual process::Future<mesos::slave::ContainerLimitation> watch(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<Nothing> update(
+      const ContainerID& containerId,
+      const std::string& cgroup,
+      const Resources& resources);
+
+  virtual process::Future<ResourceStatistics> usage(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
+  virtual process::Future<ContainerStatus> status(
+      const ContainerID& containerId,
+      const std::string& cgroup);
+
   virtual process::Future<Nothing> cleanup(
       const ContainerID& containerId,
       const std::string& cgroup);

Reply via email to