This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a06ff8ee57c60b95e84cab1930b21c6e8a6f95a1 Author: Qian Zhang <[email protected]> AuthorDate: Sat Jan 11 10:10:42 2020 +0800 Added a test `CgroupsIsolatorTest.ROOT_CGROUPS_CFS_TaskGroupLimits`. Review: https://reviews.apache.org/r/71983 --- src/tests/containerizer/cgroups_isolator_tests.cpp | 443 +++++++++++++++++++++ 1 file changed, 443 insertions(+) diff --git a/src/tests/containerizer/cgroups_isolator_tests.cpp b/src/tests/containerizer/cgroups_isolator_tests.cpp index 3578f88..f4425f0 100644 --- a/src/tests/containerizer/cgroups_isolator_tests.cpp +++ b/src/tests/containerizer/cgroups_isolator_tests.cpp @@ -28,6 +28,7 @@ #include "slave/gc_process.hpp" #include "slave/containerizer/mesos/containerizer.hpp" +#include "slave/containerizer/mesos/paths.hpp" #include "slave/containerizer/mesos/isolators/cgroups/constants.hpp" #include "slave/containerizer/mesos/isolators/cgroups/subsystems/net_cls.hpp" @@ -66,10 +67,14 @@ using mesos::internal::slave::NetClsHandle; using mesos::internal::slave::NetClsHandleManager; using mesos::internal::slave::Slave; +using mesos::internal::slave::containerizer::paths::getCgroupPath; + using mesos::master::detector::MasterDetector; +using mesos::v1::scheduler::Call; using mesos::v1::scheduler::Event; +using process::Clock; using process::Future; using process::Owned; using process::Queue; @@ -82,6 +87,7 @@ using std::string; using std::vector; using testing::_; +using testing::AllOf; using testing::DoAll; using testing::InvokeWithoutArgs; using testing::Return; @@ -849,6 +855,443 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskInfiniteLimits) } +// This test verifies the default executor container's CPU and memory +// soft & hard limits can be updated correctly when launching task groups +// and killing tasks, and also verifies task's CPU and memory soft & hard +// limits can be set correctly. +TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_TaskGroupLimits) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.isolation = "cgroups/cpu,cgroups/mem"; + flags.cgroups_enable_cfs = true; + flags.authenticate_http_readwrite = false; + + Fetcher fetcher(flags); + + Try<MesosContainerizer*> _containerizer = + MesosContainerizer::create(flags, true, &fetcher); + + ASSERT_SOME(_containerizer); + + Owned<MesosContainerizer> containerizer(_containerizer.get()); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), containerizer.get(), flags); + + ASSERT_SOME(slave); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO)); + + Future<Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers1; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers1)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID, + None(), + "cpus:0.1;mem:32;disk:32", + v1::ExecutorInfo::DEFAULT, + frameworkId); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers1); + ASSERT_FALSE(offers1->offers().empty()); + + const v1::Offer& offer1 = offers1->offers(0); + const v1::AgentID& agentId = offer1.agent_id(); + + // Launch the first task group which has two tasks, task1 has no resource + // limits specified but task2 has. + v1::TaskInfo taskInfo1 = v1::createTask( + agentId, + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), + SLEEP_COMMAND(1000)); + + mesos::v1::Value::Scalar cpuLimit, memLimit; + cpuLimit.set_value(0.5); + memLimit.set_value(64); + + google::protobuf::Map<string, mesos::v1::Value::Scalar> resourceLimits; + resourceLimits.insert({"cpus", cpuLimit}); + resourceLimits.insert({"mem", memLimit}); + + v1::TaskInfo taskInfo2 = v1::createTask( + agentId, + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), + SLEEP_COMMAND(1000), + None(), + "test-task", + id::UUID::random().toString(), + resourceLimits); + + taskInfo1.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); + taskInfo1.mutable_container()->mutable_linux_info()->set_share_cgroups(false); + taskInfo2.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); + taskInfo2.mutable_container()->mutable_linux_info()->set_share_cgroups(false); + + Future<v1::scheduler::Event::Update> startingUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> killedUpdate1; + + testing::Sequence task1; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo1.task_id()), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task1) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate1), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Update> startingUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> killedUpdate2; + + testing::Sequence task2; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo2.task_id()), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .InSequence(task2) + .WillOnce( + DoAll( + FutureArg<1>(&killedUpdate2), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<v1::scheduler::Event::Offers> offers2; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); + + { + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo1, taskInfo2})); + + Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup}); + + // Set a 0s filter to immediately get another offer to launch + // the second task group. + call.mutable_accept()->mutable_filters()->set_refuse_seconds(0); + + mesos.send(call); + } + + AWAIT_READY(startingUpdate1); + AWAIT_READY(runningUpdate1); + + AWAIT_READY(startingUpdate2); + AWAIT_READY(runningUpdate2); + + Future<hashset<ContainerID>> containers = containerizer->containers(); + AWAIT_READY(containers); + ASSERT_EQ(3u, containers->size()); + + // Get task container IDs. + const v1::ContainerStatus& containerStatus1 = + runningUpdate1->status().container_status(); + + ASSERT_TRUE(containerStatus1.has_container_id()); + ASSERT_TRUE(containerStatus1.container_id().has_parent()); + + const v1::ContainerID& taskContainerId1 = containerStatus1.container_id(); + + const v1::ContainerStatus& containerStatus2 = + runningUpdate2->status().container_status(); + + ASSERT_TRUE(containerStatus2.has_container_id()); + ASSERT_TRUE(containerStatus2.container_id().has_parent()); + + const v1::ContainerID& taskContainerId2 = containerStatus2.container_id(); + + EXPECT_EQ(taskContainerId1.parent(), taskContainerId2.parent()); + + // Get the executor container ID. + const v1::ContainerID& executorContainerId = taskContainerId1.parent(); + + Result<string> cpuHierarchy = cgroups::hierarchy("cpu"); + ASSERT_SOME(cpuHierarchy); + + Result<string> memoryHierarchy = cgroups::hierarchy("memory"); + ASSERT_SOME(memoryHierarchy); + + const string& executorCgroup = + path::join(flags.cgroups_root, executorContainerId.value()); + + const string& taskCgroup1 = + getCgroupPath(flags.cgroups_root, devolve(taskContainerId1)); + + const string& taskCgroup2 = + getCgroupPath(flags.cgroups_root, devolve(taskContainerId2)); + + // The CPU shares of the executor container is the sum of its own CPU + // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1), + // i.e. 0.3. + EXPECT_SOME_EQ( + (uint64_t)(CPU_SHARES_PER_CPU * 0.3), + cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); + + // The CPU shares of task1 is its CPU request (0.1). + EXPECT_SOME_EQ( + (uint64_t)(CPU_SHARES_PER_CPU * 0.1), + cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup1)); + + // The CPU shares of task2 is its CPU request (0.1). + EXPECT_SOME_EQ( + (uint64_t)(CPU_SHARES_PER_CPU * 0.1), + cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup2)); + + // The CFS quota of the executor container is the sum of its own CPU + // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5), + // i.e. 0.7. + Try<Duration> cfsQuota = + cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); + + ASSERT_SOME(cfsQuota); + EXPECT_EQ(0.7 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); + + // The CFS quota of task1 is its CPU request (0.1). + cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup1); + ASSERT_SOME(cfsQuota); + EXPECT_EQ(0.1 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); + + // The CFS quota of task2 is its CPU limit (0.5). + cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup2); + ASSERT_SOME(cfsQuota); + EXPECT_EQ(0.5 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); + + // The memory soft limit of the executor container is the sum of its + // own memory request (32MB) + task1's memory request (32MB) + task2's + // memory request (32MB), i.e. 96MB. + EXPECT_SOME_EQ( + Megabytes(96), + cgroups::memory::soft_limit_in_bytes( + memoryHierarchy.get(), executorCgroup)); + + // The memory soft limit of task1 is its memory request (32MB). + EXPECT_SOME_EQ( + Megabytes(32), + cgroups::memory::soft_limit_in_bytes( + memoryHierarchy.get(), taskCgroup1)); + + // The memory soft limit of task2 is its memory request (32MB). + EXPECT_SOME_EQ( + Megabytes(32), + cgroups::memory::soft_limit_in_bytes( + memoryHierarchy.get(), taskCgroup2)); + + // The memory hard limit of the executor container is the sum of its + // own memory request (32MB) + task1's memory request (32MB) + task2's + // memory limit (64MB), i.e. 128MB. + EXPECT_SOME_EQ( + Megabytes(128), + cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); + + // The memory hard limit of task1 is its memory request (32MB). + EXPECT_SOME_EQ( + Megabytes(32), + cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup1)); + + // The memory hard limit of task2 is its memory limit (64MB). + EXPECT_SOME_EQ( + Megabytes(64), + cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup2)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + Clock::resume(); + + AWAIT_READY(offers2); + ASSERT_FALSE(offers1->offers().empty()); + + const v1::Offer& offer2 = offers2->offers(0); + + // Launch the second task group which has only one task: task3, and this + // task has no resource limits specified. + v1::TaskInfo taskInfo3 = v1::createTask( + agentId, + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), + SLEEP_COMMAND(1000)); + + taskInfo3.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS); + taskInfo3.mutable_container()->mutable_linux_info()->set_share_cgroups(false); + + Future<v1::scheduler::Event::Update> startingUpdate3; + Future<v1::scheduler::Event::Update> runningUpdate3; + + testing::Sequence task3; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo3.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(task3) + .WillOnce( + DoAll( + FutureArg<1>(&startingUpdate3), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo3.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(task3) + .WillOnce( + DoAll( + FutureArg<1>(&runningUpdate3), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + mesos.send( + v1::createCallAccept( + frameworkId, + offer2, + {v1::LAUNCH_GROUP( + executorInfo, v1::createTaskGroupInfo({taskInfo3}))})); + + AWAIT_READY(startingUpdate3); + AWAIT_READY(runningUpdate3); + + // The CPU shares of the executor container is the sum of its own CPU + // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1) + // + task3's CPU request (0.1), i.e. 0.4. + EXPECT_SOME_EQ( + (uint64_t)(CPU_SHARES_PER_CPU * 0.4), + cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); + + // The CFS quota of the executor container is the sum of its own CPU + // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5) + // + task3's CPU request (0.1), i.e. 0.8. + cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); + ASSERT_SOME(cfsQuota); + EXPECT_EQ(0.8 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); + + // The memory soft limit of the executor container is the sum of its + // own memory request (32MB) + task1's memory request (32MB) + task2's + // memory request (32MB) + task3's memory request (32MB) i.e. 128MB. + EXPECT_SOME_EQ( + Megabytes(128), + cgroups::memory::soft_limit_in_bytes( + memoryHierarchy.get(), executorCgroup)); + + // The memory hard limit of the executor container is the sum of its + // own memory request (32MB) + task1's memory request (32MB) + task2's + // memory limit (64MB) + task3's memory request (32MB), i.e. 160MB. + EXPECT_SOME_EQ( + Megabytes(160), + cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); + + // Now kill a task in the first task group. + mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id())); + + // Both of the two tasks in the first group will be killed. + AWAIT_READY(killedUpdate1); + AWAIT_READY(killedUpdate2); + + // The CPU shares of the executor container is the sum of its own CPU + // request (0.1) + task3's CPU request (0.1), i.e. 0.2. + EXPECT_SOME_EQ( + (uint64_t)(CPU_SHARES_PER_CPU * 0.2), + cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup)); + + // The CFS quota of the executor container is also the sum of its own CPU + // request (0.1) + task3's CPU request (0.1), i.e. 0.2. + cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup); + ASSERT_SOME(cfsQuota); + EXPECT_EQ(0.2 * CPU_CFS_PERIOD.ms(), cfsQuota->ms()); + + // The memory soft limit of the executor container is the sum of its + // own memory request (32MB) + task3's memory request (32MB) i.e. 64MB. + EXPECT_SOME_EQ( + Megabytes(64), + cgroups::memory::soft_limit_in_bytes( + memoryHierarchy.get(), executorCgroup)); + + // We only update the memory hard limit if it is the first time or when + // we're raising the existing limit (see `MemorySubsystemProcess::update` + // for details). So now the memory hard limit of the executor container + // should still be 160MB. + EXPECT_SOME_EQ( + Megabytes(160), + cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup)); +} + + // This test verifies the limit swap functionality. Note that We use // the default executor here in order to exercise both the increasing // and decreasing of the memory limit.
