Repository: mesos Updated Branches: refs/heads/master 6380d3adc -> acd656c4e
Fixed the bug in validating duplicated task ID and added a test. Review: https://reviews.apache.org/r/27903 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/acd656c4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/acd656c4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/acd656c4 Branch: refs/heads/master Commit: acd656c4eb658d8c6ab9737a27af6547038f2999 Parents: 6380d3a Author: Jie Yu <[email protected]> Authored: Tue Nov 11 19:18:48 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Fri Nov 14 22:41:44 2014 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 13 +- src/tests/resource_offers_tests.cpp | 764 +++++++++++++++++-------------- 2 files changed, 433 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/acd656c4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 4b5d582..83c2f8a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2591,10 +2591,11 @@ void Master::_launchTasks( const TaskInfo& task = tasks[index++]; // NOTE: The task will not be in 'pendingTasks' if 'killTask()' - // for the task was called before we are here. - if (!framework->pendingTasks.contains(task.task_id())) { - continue; - } + // for the task was called before we are here. No need to launch + // the task if it's no longer pending. However, we still need to + // check the authorization result and do the validation so that we + // can send status update in case the task has duplicated ID. + bool pending = framework->pendingTasks.contains(task.task_id()); // Remove from pending tasks. framework->pendingTasks.erase(task.task_id()); @@ -2656,7 +2657,9 @@ void Master::_launchTasks( } // Launch task. - usedResources += launchTask(task, framework, slave); + if (pending) { + usedResources += launchTask(task, framework, slave); + } } // All used resources should be allocatable, enforced by our validators. http://git-wip-us.apache.org/repos/asf/mesos/blob/acd656c4/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index 21cb5ad..43820b0 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -53,59 +53,15 @@ using testing::AtMost; using testing::Return; -class ResourceOffersTest : public MesosTest {}; - - -TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves) -{ - Try<PID<Master> > master = StartMaster(); - ASSERT_SOME(master); - - // Start 10 slaves. - for (int i = 0; i < 10; i++) { - slave::Flags flags = CreateSlaveFlags(); - - flags.resources = Option<std::string>("cpus:2;mem:1024"); - - Try<PID<Slave> > slave = StartSlave(flags); - ASSERT_SOME(slave); - } - - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched, registered(&driver, _, _)) - .Times(1); - - Future<vector<Offer> > offers; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // All 10 slaves might not be in first offer. - - driver.start(); - - AWAIT_READY(offers); - EXPECT_NE(0u, offers.get().size()); - EXPECT_GE(10u, offers.get().size()); - - Resources resources(offers.get()[0].resources()); - EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value()); - EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value()); - - driver.stop(); - driver.join(); - - Shutdown(); -} +class TaskValidationTest : public MesosTest {}; -TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID) +TEST_F(TaskValidationTest, TaskUsesInvalidFrameworkID) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + Try<PID<Slave>> slave = StartSlave(); ASSERT_SOME(slave); MockScheduler sched; @@ -141,12 +97,12 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID) } -TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo) +TEST_F(TaskValidationTest, TaskUsesCommandInfoAndExecutorInfo) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + Try<PID<Slave>> slave = StartSlave(); ASSERT_SOME(slave); MockScheduler sched; @@ -155,7 +111,7 @@ TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo) EXPECT_CALL(sched, registered(&driver, _, _)); - Future<vector<Offer> > offers; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. @@ -190,12 +146,12 @@ TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo) } -TEST_F(ResourceOffersTest, TaskUsesNoResources) +TEST_F(TaskValidationTest, TaskUsesNoResources) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + Try<PID<Slave>> slave = StartSlave(); ASSERT_SOME(slave); MockScheduler sched; @@ -205,7 +161,7 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources) EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - Future<vector<Offer> > offers; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. @@ -244,12 +200,12 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources) } -TEST_F(ResourceOffersTest, TaskUsesInvalidResources) +TEST_F(TaskValidationTest, TaskUsesInvalidResources) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + Try<PID<Slave>> slave = StartSlave(); ASSERT_SOME(slave); MockScheduler sched; @@ -259,7 +215,7 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources) EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - Future<vector<Offer> > offers; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. @@ -303,12 +259,12 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources) } -TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered) +TEST_F(TaskValidationTest, TaskUsesMoreResourcesThanOffered) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + Try<PID<Slave>> slave = StartSlave(); ASSERT_SOME(slave); MockScheduler sched; @@ -318,7 +274,7 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered) EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - Future<vector<Offer> > offers; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. @@ -365,212 +321,192 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered) } -TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops) +// This test verifies that if two tasks are launched with the same +// task ID, the second task will get rejected. +TEST_F(TaskValidationTest, DuplicatedTaskID) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); ASSERT_SOME(slave); - MockScheduler sched1; - MesosSchedulerDriver driver1( - &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - EXPECT_CALL(sched1, registered(&driver1, _, _)) - .Times(1); + EXPECT_CALL(sched, registered(&driver, _, _)); - Future<vector<Offer> > offers; - EXPECT_CALL(sched1, resourceOffers(&driver1, _)) - .WillOnce(FutureArg<1>(&offers)); + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. - driver1.start(); + driver.start(); AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size()); - driver1.stop(); - driver1.join(); - - MockScheduler sched2; - MesosSchedulerDriver driver2( - &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched2, registered(&driver2, _, _)) - .Times(1); - - EXPECT_CALL(sched2, resourceOffers(&driver2, _)) - .WillOnce(FutureArg<1>(&offers)); - - driver2.start(); - - AWAIT_READY(offers); - - driver2.stop(); - driver2.join(); - - Shutdown(); -} - - -TEST_F(ResourceOffersTest, ResourcesGetReofferedWhenUnused) -{ - Try<PID<Master> > master = StartMaster(); - ASSERT_SOME(master); - - Try<PID<Slave> > slave = StartSlave(); - ASSERT_SOME(slave); - - MockScheduler sched1; - MesosSchedulerDriver driver1( - &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + ExecutorInfo executor; + executor.mutable_executor_id()->set_value("default"); + executor.mutable_command()->set_value("exit 1"); - EXPECT_CALL(sched1, registered(&driver1, _, _)) - .Times(1); + // Create two tasks with the same id. + TaskInfo task1; + task1.set_name(""); + task1.mutable_task_id()->set_value("1"); + task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task1.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:32").get()); + task1.mutable_executor()->MergeFrom(executor); - Future<vector<Offer> > offers; - EXPECT_CALL(sched1, resourceOffers(&driver1, _)) - .WillOnce(FutureArg<1>(&offers)); + TaskInfo task2; + task2.set_name(""); + task2.mutable_task_id()->set_value("1"); + task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task2.mutable_resources()->MergeFrom(Resources::parse("cpus:1;mem:32").get()); + task2.mutable_executor()->MergeFrom(executor); - driver1.start(); + vector<TaskInfo> tasks; + tasks.push_back(task1); + tasks.push_back(task2); - AWAIT_READY(offers); - EXPECT_NE(0u, offers.get().size()); + EXPECT_CALL(exec, registered(_, _, _, _)); - vector<TaskInfo> tasks; // Use nothing! - driver1.launchTasks(offers.get()[0].id(), tasks); + // Grab the first task but don't send a status update. + Future<TaskInfo> task; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(FutureArg<1>(&task)); - MockScheduler sched2; - MesosSchedulerDriver driver2( - &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); - EXPECT_CALL(sched2, registered(&driver2, _, _)) - .Times(1); + driver.launchTasks(offers.get()[0].id(), tasks); - EXPECT_CALL(sched2, resourceOffers(&driver2, _)) - .WillOnce(FutureArg<1>(&offers)); + AWAIT_READY(task); + EXPECT_EQ(task1.task_id(), task.get().task_id()); - driver2.start(); + AWAIT_READY(status); + EXPECT_EQ(TASK_ERROR, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); - AWAIT_READY(offers); + EXPECT_TRUE(strings::startsWith( + status.get().message(), "Task has duplicate ID")); - // Stop first framework before second so no offers are sent. - driver1.stop(); - driver1.join(); + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); - driver2.stop(); - driver2.join(); + driver.stop(); + driver.join(); Shutdown(); } -TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError) +// This test verifies that two tasks launched on the same slave with +// the same executor id but different executor info are rejected. +TEST_F(TaskValidationTest, ExecutorInfoDiffersOnSameSlave) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - Try<PID<Slave> > slave = StartSlave(); + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); ASSERT_SOME(slave); - MockScheduler sched1; - MesosSchedulerDriver driver1( - &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - EXPECT_CALL(sched1, registered(&driver1, _, _)) + EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - Future<vector<Offer> > offers; - EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. - driver1.start(); + driver.start(); AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size()); - TaskInfo task; - task.set_name(""); - task.mutable_task_id()->set_value("1"); - task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + ExecutorInfo executor; + executor.mutable_executor_id()->set_value("default"); + executor.mutable_command()->set_value("exit 1"); - Resource* cpus = task.add_resources(); - cpus->set_name("cpus"); - cpus->set_type(Value::SCALAR); - cpus->mutable_scalar()->set_value(0); + TaskInfo task1; + task1.set_name(""); + task1.mutable_task_id()->set_value("1"); + task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task1.mutable_resources()->MergeFrom( + Resources::parse("cpus:1;mem:512").get()); + task1.mutable_executor()->MergeFrom(executor); - Resource* mem = task.add_resources(); - mem->set_name("mem"); - mem->set_type(Value::SCALAR); - mem->mutable_scalar()->set_value(Gigabytes(1).bytes()); + executor.mutable_command()->set_value("exit 2"); + + TaskInfo task2; + task2.set_name(""); + task2.mutable_task_id()->set_value("2"); + task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task2.mutable_resources()->MergeFrom( + Resources::parse("cpus:1;mem:512").get()); + task2.mutable_executor()->MergeFrom(executor); vector<TaskInfo> tasks; - tasks.push_back(task); + tasks.push_back(task1); + tasks.push_back(task2); + + EXPECT_CALL(exec, registered(_, _, _, _)) + .Times(1); + + // Grab the "good" task but don't send a status update. + Future<TaskInfo> task; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(FutureArg<1>(&task)); Future<TaskStatus> status; - EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + EXPECT_CALL(sched, statusUpdate(&driver, _)) .WillOnce(FutureArg<1>(&status)); - driver1.launchTasks(offers.get()[0].id(), tasks); + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(task); + EXPECT_EQ(task1.task_id(), task.get().task_id()); AWAIT_READY(status); - EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(task2.task_id(), status.get().task_id()); EXPECT_EQ(TASK_ERROR, status.get().state()); EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); - EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message()); - - MockScheduler sched2; - MesosSchedulerDriver driver2( - &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - - EXPECT_CALL(sched2, registered(&driver2, _, _)) - .Times(1); - - EXPECT_CALL(sched2, resourceOffers(&driver2, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. - - driver2.start(); - - AWAIT_READY(offers); + EXPECT_TRUE(strings::contains( + status.get().message(), "Task has invalid ExecutorInfo")); - driver1.stop(); - driver1.join(); + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); - driver2.stop(); - driver2.join(); + driver.stop(); + driver.join(); Shutdown(); } -// TODO(benh): Add tests for checking correct slave IDs. - -// TODO(benh): Add tests for checking executor resource usage. - -// TODO(benh): Add tests which launch multiple tasks and check for -// unique task IDs and aggregate resource usage. - -TEST_F(ResourceOffersTest, Request) +// This test verifies that two tasks each launched on a different +// slave with same executor id but different executor info are +// allowed. +TEST_F(TaskValidationTest, ExecutorInfoDiffersOnDifferentSlaves) { - MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator; - - EXPECT_CALL(allocator, initialize(_, _, _)) - .Times(1); - - Try<PID<Master> > master = StartMaster(&allocator); - ASSERT_SOME(master); + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); MockScheduler sched; MesosSchedulerDriver driver( &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - EXPECT_CALL(allocator, frameworkAdded(_, _, _)) - .Times(1); - Future<Nothing> registered; EXPECT_CALL(sched, registered(&driver, _, _)) .WillOnce(FutureSatisfy(®istered)); @@ -579,27 +515,91 @@ TEST_F(ResourceOffersTest, Request) AWAIT_READY(registered); - vector<Request> sent; - Request request; - request.mutable_slave_id()->set_value("test"); - sent.push_back(request); + Future<vector<Offer>> offers1; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers1)); - Future<vector<Request> > received; - EXPECT_CALL(allocator, resourcesRequested(_, _)) - .WillOnce(FutureArg<1>(&received)); + // Start the first slave. + MockExecutor exec1(DEFAULT_EXECUTOR_ID); - driver.requestResources(sent); + Try<PID<Slave>> slave1 = StartSlave(&exec1); + ASSERT_SOME(slave1); - AWAIT_READY(received); - EXPECT_EQ(sent.size(), received.get().size()); - EXPECT_NE(0u, received.get().size()); - EXPECT_EQ(request.slave_id(), received.get()[0].slave_id()); + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); - EXPECT_CALL(allocator, frameworkDeactivated(_)) - .Times(AtMost(1)); // Races with shutting down the cluster. + // Launch the first task with the default executor id. + ExecutorInfo executor1; + executor1 = DEFAULT_EXECUTOR_INFO; + executor1.mutable_command()->set_value("exit 1"); - EXPECT_CALL(allocator, frameworkRemoved(_)) - .Times(AtMost(1)); // Races with shutting down the cluster. + TaskInfo task1 = createTask( + offers1.get()[0], executor1.command().value(), executor1.executor_id()); + + vector<TaskInfo> tasks1; + tasks1.push_back(task1); + + EXPECT_CALL(exec1, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec1, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status1; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status1)); + + driver.launchTasks(offers1.get()[0].id(), tasks1); + + AWAIT_READY(status1); + ASSERT_EQ(TASK_RUNNING, status1.get().state()); + + Future<vector<Offer>> offers2; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Now start the second slave. + MockExecutor exec2(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave2 = StartSlave(&exec2); + ASSERT_SOME(slave2); + + AWAIT_READY(offers2); + EXPECT_NE(0u, offers2.get().size()); + + // Now launch the second task with the same executor id but + // a different executor command. + ExecutorInfo executor2; + executor2 = executor1; + executor2.mutable_command()->set_value("exit 2"); + + TaskInfo task2 = createTask( + offers2.get()[0], executor2.command().value(), executor2.executor_id()); + + vector<TaskInfo> tasks2; + tasks2.push_back(task2); + + EXPECT_CALL(exec2, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec2, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status2; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status2)); + + driver.launchTasks(offers2.get()[0].id(), tasks2); + + AWAIT_READY(status2); + ASSERT_EQ(TASK_RUNNING, status2.get().state()); + + EXPECT_CALL(exec1, shutdown(_)) + .Times(AtMost(1)); + + EXPECT_CALL(exec2, shutdown(_)) + .Times(AtMost(1)); driver.stop(); driver.join(); @@ -608,19 +608,31 @@ TEST_F(ResourceOffersTest, Request) } -class MultipleExecutorsTest : public MesosTest {}; +// TODO(benh): Add tests for checking correct slave IDs. -// This test verifies that two tasks launched on the same slave with -// the same executor id but different executor info are rejected. -TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnSameSlave) +// TODO(benh): Add tests for checking executor resource usage. + +// TODO(benh): Add tests which launch multiple tasks and check for +// aggregate resource usage. + + +class ResourceOffersTest : public MesosTest {}; + + +TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - MockExecutor exec(DEFAULT_EXECUTOR_ID); + // Start 10 slaves. + for (int i = 0; i < 10; i++) { + slave::Flags flags = CreateSlaveFlags(); - Try<PID<Slave> > slave = StartSlave(&exec); - ASSERT_SOME(slave); + flags.resources = Option<std::string>("cpus:2;mem:1024"); + + Try<PID<Slave>> slave = StartSlave(flags); + ASSERT_SOME(slave); + } MockScheduler sched; MesosSchedulerDriver driver( @@ -629,182 +641,256 @@ TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnSameSlave) EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); - Future<vector<Offer> > offers; + Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(&driver, _)) .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. + .WillRepeatedly(Return()); // All 10 slaves might not be in first offer. driver.start(); AWAIT_READY(offers); EXPECT_NE(0u, offers.get().size()); + EXPECT_GE(10u, offers.get().size()); - ExecutorInfo executor; - executor.mutable_executor_id()->set_value("default"); - executor.mutable_command()->set_value("exit 1"); + Resources resources(offers.get()[0].resources()); + EXPECT_EQ(2, resources.get("cpus", Value::Scalar()).value()); + EXPECT_EQ(1024, resources.get("mem", Value::Scalar()).value()); - TaskInfo task1; - task1.set_name(""); - task1.mutable_task_id()->set_value("1"); - task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task1.mutable_resources()->MergeFrom( - Resources::parse("cpus:1;mem:512").get()); - task1.mutable_executor()->MergeFrom(executor); + driver.stop(); + driver.join(); - executor.mutable_command()->set_value("exit 2"); + Shutdown(); +} - TaskInfo task2; - task2.set_name(""); - task2.mutable_task_id()->set_value("2"); - task2.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); - task2.mutable_resources()->MergeFrom( - Resources::parse("cpus:1;mem:512").get()); - task2.mutable_executor()->MergeFrom(executor); - vector<TaskInfo> tasks; - tasks.push_back(task1); - tasks.push_back(task2); +TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterFrameworkStops) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); - EXPECT_CALL(exec, registered(_, _, _, _)) + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched1, registered(&driver1, _, _)) .Times(1); - // Grab the "good" task but don't send a status update. - Future<TaskInfo> task; - EXPECT_CALL(exec, launchTask(_, _)) - .WillOnce(FutureArg<1>(&task)); + Future<vector<Offer>> offers; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers)); - Future<TaskStatus> status; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status)); + driver1.start(); - driver.launchTasks(offers.get()[0].id(), tasks); + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); - AWAIT_READY(task); - EXPECT_EQ(task1.task_id(), task.get().task_id()); + driver1.stop(); + driver1.join(); - AWAIT_READY(status); - EXPECT_EQ(task2.task_id(), status.get().task_id()); - EXPECT_EQ(TASK_ERROR, status.get().state()); - EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); - EXPECT_TRUE(status.get().has_message()); - EXPECT_TRUE(strings::contains( - status.get().message(), "Task has invalid ExecutorInfo")); + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - EXPECT_CALL(exec, shutdown(_)) - .Times(AtMost(1)); + EXPECT_CALL(sched2, registered(&driver2, _, _)) + .Times(1); - driver.stop(); - driver.join(); + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver2.start(); + + AWAIT_READY(offers); + + driver2.stop(); + driver2.join(); Shutdown(); } -// This test verifies that two tasks each launched on a different -// slave with same executor id but different executor info are -// allowed. -TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnDifferentSlaves) +TEST_F(ResourceOffersTest, ResourcesGetReofferedWhenUnused) { - Try<PID<Master> > master = StartMaster(); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); - Future<Nothing> registered; - EXPECT_CALL(sched, registered(&driver, _, _)) - .WillOnce(FutureSatisfy(®istered)); + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - driver.start(); + EXPECT_CALL(sched1, registered(&driver1, _, _)) + .Times(1); - AWAIT_READY(registered); + Future<vector<Offer>> offers; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers)); - Future<vector<Offer> > offers1; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers1)); + driver1.start(); - // Start the first slave. - MockExecutor exec1(DEFAULT_EXECUTOR_ID); + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); - Try<PID<Slave> > slave1 = StartSlave(&exec1); - ASSERT_SOME(slave1); + vector<TaskInfo> tasks; // Use nothing! + driver1.launchTasks(offers.get()[0].id(), tasks); - AWAIT_READY(offers1); - EXPECT_NE(0u, offers1.get().size()); + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - // Launch the first task with the default executor id. - ExecutorInfo executor1; - executor1 = DEFAULT_EXECUTOR_INFO; - executor1.mutable_command()->set_value("exit 1"); + EXPECT_CALL(sched2, registered(&driver2, _, _)) + .Times(1); - TaskInfo task1 = createTask( - offers1.get()[0], executor1.command().value(), executor1.executor_id()); + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers)); - vector<TaskInfo> tasks1; - tasks1.push_back(task1); + driver2.start(); - EXPECT_CALL(exec1, registered(_, _, _, _)) + AWAIT_READY(offers); + + // Stop first framework before second so no offers are sent. + driver1.stop(); + driver1.join(); + + driver2.stop(); + driver2.join(); + + Shutdown(); +} + + +TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched1; + MesosSchedulerDriver driver1( + &sched1, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched1, registered(&driver1, _, _)) .Times(1); - EXPECT_CALL(exec1, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + Future<vector<Offer>> offers; + EXPECT_CALL(sched1, resourceOffers(&driver1, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. - Future<TaskStatus> status1; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status1)); + driver1.start(); - driver.launchTasks(offers1.get()[0].id(), tasks1); + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); - AWAIT_READY(status1); - ASSERT_EQ(TASK_RUNNING, status1.get().state()); + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); - Future<vector<Offer> > offers2; - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillOnce(FutureArg<1>(&offers2)) + Resource* cpus = task.add_resources(); + cpus->set_name("cpus"); + cpus->set_type(Value::SCALAR); + cpus->mutable_scalar()->set_value(0); + + Resource* mem = task.add_resources(); + mem->set_name("mem"); + mem->set_type(Value::SCALAR); + mem->mutable_scalar()->set_value(Gigabytes(1).bytes()); + + vector<TaskInfo> tasks; + tasks.push_back(task); + + Future<TaskStatus> status; + EXPECT_CALL(sched1, statusUpdate(&driver1, _)) + .WillOnce(FutureArg<1>(&status)); + + driver1.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(status); + EXPECT_EQ(task.task_id(), status.get().task_id()); + EXPECT_EQ(TASK_ERROR, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); + EXPECT_TRUE(status.get().has_message()); + EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message()); + + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched2, registered(&driver2, _, _)) + .Times(1); + + EXPECT_CALL(sched2, resourceOffers(&driver2, _)) + .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(Return()); // Ignore subsequent offers. - // Now start the second slave. - MockExecutor exec2(DEFAULT_EXECUTOR_ID); + driver2.start(); - Try<PID<Slave> > slave2 = StartSlave(&exec2); - ASSERT_SOME(slave2); + AWAIT_READY(offers); - AWAIT_READY(offers2); - EXPECT_NE(0u, offers2.get().size()); + driver1.stop(); + driver1.join(); - // Now launch the second task with the same executor id but - // a different executor command. - ExecutorInfo executor2; - executor2 = executor1; - executor2.mutable_command()->set_value("exit 2"); + driver2.stop(); + driver2.join(); - TaskInfo task2 = createTask( - offers2.get()[0], executor2.command().value(), executor2.executor_id()); + Shutdown(); +} - vector<TaskInfo> tasks2; - tasks2.push_back(task2); - EXPECT_CALL(exec2, registered(_, _, _, _)) +TEST_F(ResourceOffersTest, Request) +{ + MockAllocatorProcess<HierarchicalDRFAllocatorProcess> allocator; + + EXPECT_CALL(allocator, initialize(_, _, _)) .Times(1); - EXPECT_CALL(exec2, launchTask(_, _)) - .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + Try<PID<Master>> master = StartMaster(&allocator); + ASSERT_SOME(master); - Future<TaskStatus> status2; - EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&status2)); + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); - driver.launchTasks(offers2.get()[0].id(), tasks2); + EXPECT_CALL(allocator, frameworkAdded(_, _, _)) + .Times(1); - AWAIT_READY(status2); - ASSERT_EQ(TASK_RUNNING, status2.get().state()); + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); - EXPECT_CALL(exec1, shutdown(_)) - .Times(AtMost(1)); + driver.start(); - EXPECT_CALL(exec2, shutdown(_)) - .Times(AtMost(1)); + AWAIT_READY(registered); + + vector<Request> sent; + Request request; + request.mutable_slave_id()->set_value("test"); + sent.push_back(request); + + Future<vector<Request>> received; + EXPECT_CALL(allocator, resourcesRequested(_, _)) + .WillOnce(FutureArg<1>(&received)); + + driver.requestResources(sent); + + AWAIT_READY(received); + EXPECT_EQ(sent.size(), received.get().size()); + EXPECT_NE(0u, received.get().size()); + EXPECT_EQ(request.slave_id(), received.get()[0].slave_id()); + + EXPECT_CALL(allocator, frameworkDeactivated(_)) + .Times(AtMost(1)); // Races with shutting down the cluster. + + EXPECT_CALL(allocator, frameworkRemoved(_)) + .Times(AtMost(1)); // Races with shutting down the cluster. driver.stop(); driver.join();
