This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 31ed8e4c17d46e8bc36cbd61c23ef87af42a8ec0
Author: Qian Zhang <zhq527...@gmail.com>
AuthorDate: Mon Jan 6 16:46:26 2020 +0800

    Added a test `ROOT_CGROUPS_CFS_CommandTaskLimits`.
    
    Review: https://reviews.apache.org/r/71956
---
 src/tests/containerizer/cgroups_isolator_tests.cpp | 191 +++++++++++++++++++++
 1 file changed, 191 insertions(+)

diff --git a/src/tests/containerizer/cgroups_isolator_tests.cpp 
b/src/tests/containerizer/cgroups_isolator_tests.cpp
index f9f427e..71d4604 100644
--- a/src/tests/containerizer/cgroups_isolator_tests.cpp
+++ b/src/tests/containerizer/cgroups_isolator_tests.cpp
@@ -538,6 +538,197 @@ TEST_F(CgroupsIsolatorTest, 
ROOT_CGROUPS_CFS_CommandTaskNoLimits)
 }
 
 
+// This test verifies that a task launched with resource limits specified
+// will have its CPU and memory's soft & hard limits and OOM score adjustment
+// set correctly, and it cannot consume more cpu time than its CFS quota.
+TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskLimits)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start agent with 2 CPUs, total host memory and 1024MB disk.
+  Try<os::Memory> memory = os::memory();
+  ASSERT_SOME(memory);
+
+  uint64_t totalMemInMB = memory->total.bytes() / 1024 / 1024;
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "cgroups/cpu,cgroups/mem";
+  flags.resources =
+    strings::format("cpus:2;mem:%d;disk:1024", totalMemInMB).get();
+
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(
+      detector.get(),
+      containerizer.get(),
+      flags);
+
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched,
+      DEFAULT_FRAMEWORK_INFO,
+      master.get()->pid,
+      DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  // Generate random numbers to max out a single core. We'll run
+  // this for 0.5 seconds of wall time so it should consume
+  // approximately 300 ms of total cpu time when limited to 0.6
+  // cpu. We use /dev/urandom to prevent blocking on Linux when
+  // there's insufficient entropy.
+  string command =
+    "cat /dev/urandom > /dev/null & "
+    "export MESOS_TEST_PID=$! && "
+    "sleep 0.5 && "
+    "kill $MESOS_TEST_PID";
+
+  // Launch a task with 0.2 cpu request, 0.5 cpu limit, half of
+  // host total memory - `DEFAULT_EXECUTOR_MEM` as memory request
+  // and half of host total memory as memory limit.
+  string resourceRequests = strings::format(
+      "cpus:0.2;mem:%d;disk:1024",
+      totalMemInMB/2 - DEFAULT_EXECUTOR_MEM.bytes() / 1024 / 1024).get();
+
+  Value::Scalar cpuLimit, memLimit;
+  cpuLimit.set_value(0.5);
+  memLimit.set_value(totalMemInMB/2);
+
+  google::protobuf::Map<string, Value::Scalar> resourceLimits;
+  resourceLimits.insert({"cpus", cpuLimit});
+  resourceLimits.insert({"mem", memLimit});
+
+  TaskInfo task = createTask(
+      offers.get()[0].slave_id(),
+      Resources::parse(resourceRequests).get(),
+      command,
+      None(),
+      "test-task",
+      id::UUID::random().toString(),
+      resourceLimits);
+
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  Future<hashset<ContainerID>> containers = containerizer->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers->size());
+
+  ContainerID containerId = *(containers->begin());
+
+  Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
+  ASSERT_SOME(cpuHierarchy);
+
+  Result<string> memoryHierarchy = cgroups::hierarchy("memory");
+  ASSERT_SOME(memoryHierarchy);
+
+  string cgroup = path::join(flags.cgroups_root, containerId.value());
+
+  // The command executor will be given 0.1 cpu (`DEFAULT_EXECUTOR_CPUS`)
+  // by default, so in total the CPU shares of the executor container
+  // should be 0.3.
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.3),
+      cgroups::cpu::shares(cpuHierarchy.get(), cgroup));
+
+  // The 0.1 cpu given to the command executor is also included in the cpu
+  // limit, so in total the CFS quota should be 0.6.
+  Try<Duration> cfsQuota =
+    cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), cgroup);
+
+  ASSERT_SOME(cfsQuota);
+
+  double expectedCFSQuota =
+    (cpuLimit.value() + DEFAULT_EXECUTOR_CPUS) * CPU_CFS_PERIOD.ms();
+
+  EXPECT_EQ(expectedCFSQuota, cfsQuota->ms());
+
+  // The command executor will be given 32MB (`DEFAULT_EXECUTOR_MEM`) by 
default
+  // so in total the memory soft limit should be half of host total memory.
+  EXPECT_SOME_EQ(
+      Megabytes(totalMemInMB/2),
+      cgroups::memory::soft_limit_in_bytes(memoryHierarchy.get(), cgroup));
+
+  // The 32MB memory given to the command executor is also included in the
+  // memory limit, so in total the memory limit should be half of host total
+  // memory + 32MB.
+  EXPECT_SOME_EQ(
+      Megabytes(memLimit.value()) + DEFAULT_EXECUTOR_MEM,
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), cgroup));
+
+  Future<ContainerStatus> status = containerizer->status(containerId);
+  AWAIT_READY(status);
+  ASSERT_TRUE(status->has_executor_pid());
+
+  // Ensure the OOM score adjustment is correctly set for the container.
+  Try<string> read = os::read(
+      strings::format("/proc/%d/oom_score_adj", status->executor_pid()).get());
+
+  ASSERT_SOME(read);
+
+  // Since the memory request is half of host total memory (please note that
+  // `DEFAULT_EXECUTOR_MEM` is also included in memory request), so the OOM
+  // score adjustment should be about 500, see 
`MemorySubsystemProcess::isolate`
+  // for the detailed algorithm.
+  Try<int32_t> oomScoreAdj = numify<int32_t>(strings::trim(read.get()));
+  ASSERT_SOME(oomScoreAdj);
+
+  EXPECT_GT(502, oomScoreAdj.get());
+  EXPECT_LT(498, oomScoreAdj.get());
+
+  Future<ResourceStatistics> usage = containerizer->usage(containerId);
+  AWAIT_READY(usage);
+
+  // Expect that no more than 400 ms of cpu time has been consumed. We
+  // also check that at least 50 ms of cpu time has been consumed so
+  // this test will fail if the host system is very heavily loaded.
+  // This behavior is correct because under such conditions we aren't
+  // actually testing the CFS cpu limiter.
+  double cpuTime = usage->cpus_system_time_secs() +
+                   usage->cpus_user_time_secs();
+
+  EXPECT_GE(0.4, cpuTime);
+  EXPECT_LE(0.05, cpuTime);
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies the limit swap functionality. Note that We use
 // the default executor here in order to exercise both the increasing
 // and decreasing of the memory limit.

Reply via email to