This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 72e78f02f08116c0d4c4a825144b078cfbee5052 Author: Greg Mann <[email protected]> AuthorDate: Fri Mar 20 10:35:37 2020 -0700 Added tests for master validation of limits and shared cgroups. Review: https://reviews.apache.org/r/72217/ --- src/tests/master_validation_tests.cpp | 771 ++++++++++++++++++++++++++++++++++ 1 file changed, 771 insertions(+) diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index 8d5e74e..9efca42 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -3486,6 +3486,214 @@ TEST_F(TaskValidationTest, TaskSettingDockerParameterName) driver.join(); } + +TEST_F(TaskValidationTest, ResourceLimitLessThanRequest) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + Map<string, Value::Scalar> limits; + limits["cpus"].set_value(0.01); + + TaskInfo task = createTask( + offers->at(0), + "exit 0", + None(), + "test-task", + id::UUID::random().toString(), + limits); + + driver.launchTasks(offers->at(0).id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_ERROR, status->state()); + EXPECT_TRUE(strings::contains( + status->message(), + "The cpu limit must be greater than or equal to the cpu request")); + + driver.stop(); + driver.join(); +} + + +TEST_F(TaskValidationTest, LimitOtherThanCpuOrMem) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + Map<string, Value::Scalar> limits; + limits["disk"].set_value(128); + + TaskInfo task = createTask( + offers->at(0), + "exit 0", + None(), + "test-task", + id::UUID::random().toString(), + limits); + + driver.launchTasks(offers->at(0).id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_ERROR, status->state()); + EXPECT_TRUE(strings::contains( + status->message(), + "Only cpus and mem may be included in a task's resource limits")); + + driver.stop(); + driver.join(); +} + + +TEST_F(TaskValidationTest, NestedCgroupInLaunchOperation) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + TaskInfo task = createTask(offers->at(0), "exit 0"); + + task.mutable_container()->set_type(ContainerInfo::MESOS); + task.mutable_container()->mutable_linux_info()->set_share_cgroups(false); + + driver.launchTasks(offers->at(0).id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_ERROR, status->state()); + EXPECT_TRUE(strings::contains( + status->message(), + "Only tasks in a task group may have 'share_cgroups' set to 'false'")); + + driver.stop(); + driver.join(); +} + + +TEST_F(TaskValidationTest, SharedCgroupOnExecutor) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + const Offer& offer = offers->at(0); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + ExecutorID executorId; + executorId.set_value("test-executor"); + + TaskInfo task = createTask(offer, "exit 0", executorId); + task.mutable_executor()->mutable_container()->set_type(ContainerInfo::MESOS); + task.mutable_executor()->mutable_container() + ->mutable_linux_info()->set_share_cgroups(true); + + driver.launchTasks(offer.id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_ERROR, status->state()); + EXPECT_TRUE(strings::contains( + status->message(), + "The 'share_cgroups' field cannot be set to 'true' on " + "executor containers")); + + driver.stop(); + driver.join(); +} + // TODO(jieyu): Add tests for checking duplicated persistence ID // against offered resources. @@ -4639,6 +4847,569 @@ TEST_F(TaskGroupValidationTest, TaskEnvironmentInvalid) } +TEST_F(TaskGroupValidationTest, ResourceLimits) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + const Offer& offer = offers->at(0); + + const SlaveID slaveId = offer.slave_id(); + frameworkInfo.mutable_id()->CopyFrom(offer.framework_id()); + + TaskInfo task1 = createTask( + slaveId, + Resources::parse("cpus:0.5;mem:32").get(), + "exit 0", + None(), + "test-task-1", + id::UUID::random().toString()); + + TaskInfo task2 = createTask( + slaveId, + Resources::parse("cpus:0.5;mem:32").get(), + "exit 0", + None(), + "test-task-2", + id::UUID::random().toString()); + + Value::Scalar cpuLimit; + cpuLimit.set_value(1); + (*task1.mutable_limits())["cpus"] = cpuLimit; + (*task2.mutable_limits())["cpus"] = cpuLimit; + + Value::Scalar validMemLimit; + validMemLimit.set_value(64); + (*task1.mutable_limits())["mem"] = validMemLimit; + + Value::Scalar invalidMemLimit; + invalidMemLimit.set_value(16); + (*task2.mutable_limits())["mem"] = invalidMemLimit; + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::MESOS); + containerInfo.mutable_linux_info()->set_share_cgroups(false); + + task1.mutable_container()->CopyFrom(containerInfo); + task2.mutable_container()->CopyFrom(containerInfo); + + ExecutorInfo executor = createExecutorInfo( + "test-executor", + None(), + Resources::parse("cpus:0.5;mem:32;disk:128").get(), + ExecutorInfo::DEFAULT, + frameworkInfo.id()); + + // Use short filters so that resources are re-offered. + Filters filters; + filters.set_refuse_seconds(0); + + // Error: A memory limit less than the memory request. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "memory limit must be greater")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "memory limit must be greater")); + } + + (*task2.mutable_limits())["mem"] = validMemLimit; + + containerInfo.mutable_linux_info()->set_share_cgroups(true); + + task2.mutable_container()->CopyFrom(containerInfo); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Error: limits set when 'share_cgroups' is 'true'. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "Resource limits may only be set for tasks within a task group when " + "the 'share_cgroups' field is set to 'false'.")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "Resource limits may only be set for tasks within a task group when " + "the 'share_cgroups' field is set to 'false'.")); + } + + containerInfo.mutable_linux_info()->set_share_cgroups(false); + + task2.mutable_container()->CopyFrom(containerInfo); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Valid, no error. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_STARTING, task1Status->state()); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_STARTING, task2Status->state()); + } +} + + +TEST_F(TaskGroupValidationTest, ShareCgroup) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + Clock::advance(masterFlags.allocation_interval); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + const SlaveID slaveId = offers->at(0).slave_id(); + frameworkInfo.mutable_id()->CopyFrom(offers->at(0).framework_id()); + + TaskInfo task1 = createTask( + slaveId, + Resources::parse("cpus:0.1;mem:32").get(), + "exit 0", + None(), + "test-task-1", + id::UUID::random().toString()); + + TaskInfo task2 = createTask( + slaveId, + Resources::parse("cpus:0.1;mem:32").get(), + "exit 0", + None(), + "test-task-2", + id::UUID::random().toString()); + + ContainerInfo containerInfoNestedCgroups; + containerInfoNestedCgroups.set_type(ContainerInfo::MESOS); + containerInfoNestedCgroups.mutable_linux_info()->set_share_cgroups(false); + + task2.mutable_container()->CopyFrom(containerInfoNestedCgroups); + + ExecutorInfo executor = createExecutorInfo( + "test-executor", + None(), + Resources::parse("cpus:0.5;mem:32;disk:128").get(), + ExecutorInfo::DEFAULT, + frameworkInfo.id()); + + // Use short filters so that resources are re-offered. + Filters filters; + filters.set_refuse_seconds(0); + + // Error: One task with 'share_cgroups==false', another task with no + // 'LinuxInfo' set. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "If set, the value of 'share_cgroups' must be the same for all tasks " + "in each task group and under a single executor")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "If set, the value of 'share_cgroups' must be the same for all tasks " + "in each task group and under a single executor")); + } + + ContainerInfo containerInfoSharedCgroups; + containerInfoSharedCgroups.set_type(ContainerInfo::MESOS); + containerInfoSharedCgroups.mutable_linux_info()->set_share_cgroups(true); + + task1.mutable_container()->CopyFrom(containerInfoSharedCgroups); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Error: Tasks with different values of 'share_cgroups'. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "If set, the value of 'share_cgroups' must be the same for all tasks " + "in each task group and under a single executor")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "If set, the value of 'share_cgroups' must be the same for all tasks " + "in each task group and under a single executor")); + } + + task1.mutable_container()->CopyFrom(containerInfoNestedCgroups); + + executor.set_type(ExecutorInfo::CUSTOM); + executor.mutable_container()->CopyFrom(containerInfoSharedCgroups); + executor.mutable_command()->set_value("exit 0"); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Error: Executor with 'share_cgroups' set to 'true'. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "The 'share_cgroups' field cannot be set to 'true' on " + "executor containers")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "The 'share_cgroups' field cannot be set to 'true' on " + "executor containers")); + } + + executor.mutable_container()->CopyFrom(containerInfoNestedCgroups); + executor.set_type(ExecutorInfo::DEFAULT); + executor.clear_command(); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Valid, no error. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Starting; + Future<TaskStatus> task2Starting; + Future<TaskStatus> task1Running; + Future<TaskStatus> task2Running; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Starting)) + .WillOnce(FutureArg<1>(&task2Starting)) + .WillOnce(FutureArg<1>(&task1Running)) + .WillOnce(FutureArg<1>(&task2Running)); + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Starting); + EXPECT_EQ(TASK_STARTING, task1Starting->state()); + AWAIT_READY(task2Starting); + EXPECT_EQ(TASK_STARTING, task2Starting->state()); + + AWAIT_READY(task1Running); + EXPECT_EQ(TASK_RUNNING, task1Running->state()); + AWAIT_READY(task2Running); + EXPECT_EQ(TASK_RUNNING, task2Running->state()); + } + + task1 = createTask( + slaveId, + Resources::parse("cpus:0.1;mem:32").get(), + "exit 0", + None(), + "test-task-1", + id::UUID::random().toString()); + + task2 = createTask( + slaveId, + Resources::parse("cpus:0.1;mem:32").get(), + "exit 0", + None(), + "test-task-2", + id::UUID::random().toString()); + + task1.mutable_container()->CopyFrom(containerInfoSharedCgroups); + task2.mutable_container()->CopyFrom(containerInfoSharedCgroups); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)); + + Clock::advance(masterFlags.allocation_interval); + Clock::settle(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + // Error: The 'share_cgroups' field must have the same value for all tasks + // under a single executor. + { + TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(task1); + taskGroup.add_tasks()->CopyFrom(task2); + + Offer::Operation operation; + operation.set_type(Offer::Operation::LAUNCH_GROUP); + + Offer::Operation::LaunchGroup* launchGroup = + operation.mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(executor); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + Future<TaskStatus> task1Status; + Future<TaskStatus> task2Status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&task1Status)) + .WillOnce(FutureArg<1>(&task2Status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + driver.acceptOffers({offers->at(0).id()}, {operation}, filters); + + AWAIT_READY(task1Status); + EXPECT_EQ(TASK_ERROR, task1Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); + EXPECT_TRUE(strings::contains( + task1Status->message(), + "If set, the value of 'share_cgroups' must be the same for all " + "tasks in each task group and under a single executor")); + + AWAIT_READY(task2Status); + EXPECT_EQ(TASK_ERROR, task2Status->state()); + EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task2Status->reason()); + EXPECT_TRUE(strings::contains( + task2Status->message(), + "If set, the value of 'share_cgroups' must be the same for all " + "tasks in each task group and under a single executor")); + } +} + + class FrameworkInfoValidationTest : public MesosTest {};
