Repository: mesos
Updated Branches:
  refs/heads/master cdcaca740 -> 24d640e2d


Expose the number of processes and threads in a container when cgroup is 
enabled.

Expose the number of processes and threads in a container when cgroup is 
enabled.

Review: https://reviews.apache.org/r/31250


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24d640e2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24d640e2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24d640e2

Branch: refs/heads/master
Commit: 24d640e2d8836600bfc4634c204308e270b481da
Parents: cdcaca7
Author: Chi Zhang <[email protected]>
Authored: Mon Mar 16 12:44:01 2015 -0700
Committer: Ian Downes <[email protected]>
Committed: Mon Mar 16 12:56:02 2015 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                       |   3 +
 .../isolators/cgroups/cpushare.cpp              |  30 +++++
 src/slave/flags.hpp                             |   7 ++
 src/tests/isolator_tests.cpp                    | 110 +++++++++++++++++++
 4 files changed, 150 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/24d640e2/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 9df972d..ec8efae 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -437,6 +437,9 @@ message Resource {
 message ResourceStatistics {
   required double timestamp = 1; // Snapshot time, in seconds since the Epoch.
 
+  optional uint32 processes = 30;
+  optional uint32 threads = 31;
+
   // CPU Usage Information:
   // Total CPU time spent in user mode, and kernel mode.
   optional double cpus_user_time_secs = 2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/24d640e2/src/slave/containerizer/isolators/cgroups/cpushare.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/isolators/cgroups/cpushare.cpp 
b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
index d496768..c4a5aec 100644
--- a/src/slave/containerizer/isolators/cgroups/cpushare.cpp
+++ b/src/slave/containerizer/isolators/cgroups/cpushare.cpp
@@ -417,6 +417,36 @@ Future<ResourceStatistics> 
CgroupsCpushareIsolatorProcess::usage(
 
   ResourceStatistics result;
 
+  // TODO(chzhcn): Getting the number of processes and threads is
+  // available as long as any cgroup subsystem is used so this best
+  // not be tied to a specific cgroup isolator. A better place is
+  // probably Linux Launcher, which uses the cgroup freezer subsystem.
+  // That requires some change for it to adopt the new semantics of
+  // reporting subsystem-independent cgroup usage.
+  // Note: The complexity of this operation is linear to the number of
+  // processes and threads in a container: the kernel has to allocate
+  // memory to contain the list of pids or tids; the userspace has to
+  // parse the cgroup files to get the size. If this proves to be a
+  // performance bottleneck, some kind of rate limiting mechanism
+  // needs to be employed.
+  if (flags.cgroups_cpu_enable_pids_and_tids_count) {
+    Try<std::set<pid_t>> pids =
+      cgroups::processes(hierarchies["cpuacct"], info->cgroup);
+    if (pids.isError()) {
+      return Failure("Failed to get number of processes: " + pids.error());
+    }
+
+    result.set_processes(pids.get().size());
+
+    Try<std::set<pid_t>> tids =
+      cgroups::threads(hierarchies["cpuacct"], info->cgroup);
+    if (tids.isError()) {
+      return Failure("Failed to get number of threads: " + tids.error());
+    }
+
+    result.set_threads(tids.get().size());
+  }
+
   // Get the number of clock ticks, used for cpu accounting.
   static long ticks = sysconf(_SC_CLK_TCK);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/24d640e2/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 56b25ca..dbaf5f5 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -226,6 +226,12 @@ public:
         "swap instead of just memory.\n",
         false);
 
+    add(&Flags::cgroups_cpu_enable_pids_and_tids_count,
+        "cgroups_cpu_enable_pids_and_tids_count",
+        "Cgroups feature flag to enable counting of processes and threads\n"
+        "inside a container.\n",
+        false);
+
     add(&Flags::slave_subsystems,
         "slave_subsystems",
         "List of comma-separated cgroup subsystems to run the slave binary\n"
@@ -488,6 +494,7 @@ public:
   std::string cgroups_root;
   bool cgroups_enable_cfs;
   bool cgroups_limit_swap;
+  bool cgroups_cpu_enable_pids_and_tids_count;
   Option<std::string> slave_subsystems;
   Option<std::string> perf_events;
   Duration perf_interval;

http://git-wip-us.apache.org/repos/asf/mesos/blob/24d640e2/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index 0936436..535e9af 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -544,6 +544,116 @@ TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Cfs_Big_Quota)
   delete launcher.get();
 }
 
+
+// A test to verify the number of processes and threads in a
+// container.
+TEST_F(LimitedCpuIsolatorTest, ROOT_CGROUPS_Pids_and_Tids)
+{
+  slave::Flags flags;
+  flags.cgroups_cpu_enable_pids_and_tids_count = true;
+
+  Try<Isolator*> isolator = CgroupsCpushareIsolatorProcess::create(flags);
+  CHECK_SOME(isolator);
+
+  Try<Launcher*> launcher = LinuxLauncher::create(flags);
+  CHECK_SOME(launcher);
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_resources()->CopyFrom(
+      Resources::parse("cpus:0.5;mem:512").get());
+
+  ContainerID containerId;
+  containerId.set_value("mesos_test_cpu_pids_and_tids");
+
+  // Use a relative temporary directory so it gets cleaned up
+  // automatically with the test.
+  Try<string> dir = os::mkdtemp(path::join(os::getcwd(), "XXXXXX"));
+  ASSERT_SOME(dir);
+
+  AWAIT_READY(
+      isolator.get()->prepare(containerId, executorInfo, dir.get(), None()));
+
+  // Right after the creation of the cgroup, which happens in
+  // 'prepare', we check that it is empty.
+  Future<ResourceStatistics> usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_EQ(0U, usage.get().processes());
+  EXPECT_EQ(0U, usage.get().threads());
+
+  int pipes[2];
+  ASSERT_NE(-1, ::pipe(pipes));
+
+  vector<string> argv(3);
+  argv[0] = "sh";
+  argv[1] = "-c";
+  argv[2] = "while true; do sleep 1; done;";
+
+  Try<pid_t> pid = launcher.get()->fork(
+      containerId,
+      "/bin/sh",
+      argv,
+      Subprocess::FD(STDIN_FILENO),
+      Subprocess::FD(STDOUT_FILENO),
+      Subprocess::FD(STDERR_FILENO),
+      None(),
+      None(),
+      lambda::bind(&childSetup, pipes));
+
+  ASSERT_SOME(pid);
+
+  // Reap the forked child.
+  Future<Option<int>> status = process::reap(pid.get());
+
+  // Continue in the parent.
+  ASSERT_SOME(os::close(pipes[0]));
+
+  // Before isolation, the cgroup is empty.
+  usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_EQ(0U, usage.get().processes());
+  EXPECT_EQ(0U, usage.get().threads());
+
+  // Isolate the forked child.
+  AWAIT_READY(isolator.get()->isolate(containerId, pid.get()));
+
+  // After the isolation, the cgroup is not empty, even though the
+  // process hasn't exec'd yet.
+  usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_EQ(1U, usage.get().processes());
+  EXPECT_EQ(1U, usage.get().threads());
+
+  // Now signal the child to continue.
+  char dummy;
+  ASSERT_LT(0, ::write(pipes[1], &dummy, sizeof(dummy)));
+
+  ASSERT_SOME(os::close(pipes[1]));
+
+  // Process count should be 1 since 'sleep' is still sleeping.
+  usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_EQ(1U, usage.get().processes());
+  EXPECT_EQ(1U, usage.get().threads());
+
+  // Ensure all processes are killed.
+  AWAIT_READY(launcher.get()->destroy(containerId));
+
+  // Wait for the command to complete.
+  AWAIT_READY(status);
+
+  // After the process is killed, the cgroup should be empty again.
+  usage = isolator.get()->usage(containerId);
+  AWAIT_READY(usage);
+  EXPECT_EQ(0U, usage.get().processes());
+  EXPECT_EQ(0U, usage.get().threads());
+
+  // Let the isolator clean up.
+  AWAIT_READY(isolator.get()->cleanup(containerId));
+
+  delete isolator.get();
+  delete launcher.get();
+}
+
 #endif // __linux__
 
 template <typename T>

Reply via email to