Repository: mesos Updated Branches: refs/heads/master 190e87c51 -> 0bdc7adf6
Updated allocator to offer cpu only or memory only resources. As already explained in JIRA MESOS-1688, there are schedulers allocating memory only for the executor and not for tasks. For tasks only CPU resources are allocated in this case. Such a scheduler does not get offered any idle CPUs if the slave has nearly used up all memory. This can easily lead to a dead lock (in the application, not in Mesos). Simple example: 1. Scheduler allocates all memory of a slave for an executor 2. Scheduler launches a task for this executor (allocating 1 CPU) 3. Task finishes: 1 CPU , 0 MB memory allocatable. 4. No offers are made, as no memory is left. Scheduler will wait for offers forever. Dead lock in the application. To fix this problem, offers must be made if CPU resources are allocatable without considering allocatable memory. Review: https://reviews.apache.org/r/25035 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0bdc7adf Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0bdc7adf Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0bdc7adf Branch: refs/heads/master Commit: 0bdc7adf612cec26f5f3b190c17694e67b14b4b2 Parents: 190e87c Author: Martin Weindel <[email protected]> Authored: Wed Sep 17 11:24:25 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Sep 17 11:34:33 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 8 + src/common/resources.cpp | 9 +- src/master/constants.cpp | 2 +- src/master/hierarchical_allocator_process.hpp | 15 +- src/master/master.cpp | 34 +++- src/tests/allocator_tests.cpp | 181 ++++++++++++++++++++- 6 files changed, 227 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index a822cc4..77e15b3 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,11 @@ +(WIP) Release Notes - Mesos - Version 0.21.0 +-------------------------------------- + +* Deprecations: + * [MESOS-1807] Disallow executors with cpu only or memory only + resources. + + Release Notes - Mesos - Version 0.20.0 -------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/src/common/resources.cpp ---------------------------------------------------------------------- diff --git a/src/common/resources.cpp b/src/common/resources.cpp index 29fc765..e9a0c85 100644 --- a/src/common/resources.cpp +++ b/src/common/resources.cpp @@ -216,7 +216,9 @@ bool Resources::operator <= (const Resources& that) const foreach (const Resource& resource, resources) { Option<Resource> option = that.get(resource); if (option.isNone()) { - return false; + if (!isZero(resource)) { + return false; + } } else { if (!(resource <= option.get())) { return false; @@ -367,7 +369,10 @@ Option<Resources> Resources::find( Option<Resources> all = getAll(findResource); bool done = false; - if (all.isSome()) { + if (isZero(findResource)) { + // Done, as no resources of this type have been requested. + done = true; + } else if (all.isSome()) { for (int i = 0; i < 3 && !done; i++) { foreach (const Resource& potential, all.get()) { // Ensures that we take resources first from the specified role, http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/src/master/constants.cpp ---------------------------------------------------------------------- diff --git a/src/master/constants.cpp b/src/master/constants.cpp index faa1503..3ebd246 100644 --- a/src/master/constants.cpp +++ b/src/master/constants.cpp @@ -29,7 +29,7 @@ namespace internal { namespace master { const int MAX_OFFERS_PER_FRAMEWORK = 50; -const double MIN_CPUS = 0.1; +const double MIN_CPUS = 0.01; const Bytes MIN_MEM = Megabytes(32); const Duration SLAVE_PING_TIMEOUT = Seconds(15); const uint32_t MAX_SLAVE_PING_TIMEOUTS = 5; http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/src/master/hierarchical_allocator_process.hpp ---------------------------------------------------------------------- diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp index 94bd228..31dfb2c 100644 --- a/src/master/hierarchical_allocator_process.hpp +++ b/src/master/hierarchical_allocator_process.hpp @@ -828,22 +828,11 @@ bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable( const Resources& resources) { - // TODO(benh): For now, only make offers when there is some cpu - // and memory left. This is an artifact of the original code that - // only offered when there was at least 1 cpu "unit" available, - // and without doing this a framework might get offered resources - // with only memory available (which it obviously will decline) - // and then end up waiting the default Filters::refuse_seconds - // (unless the framework set it to something different). - Option<double> cpus = resources.cpus(); Option<Bytes> mem = resources.mem(); - if (cpus.isSome() && mem.isSome()) { - return cpus.get() >= MIN_CPUS && mem.get() > MIN_MEM; - } - - return false; + return (cpus.isSome() && cpus.get() >= MIN_CPUS) || + (mem.isSome() && mem.get() >= MIN_MEM); } } // namespace allocator { http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d5db24e..41dcc46 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1870,7 +1870,7 @@ struct ResourceUsageChecker : TaskInfoVisitor } // Check if this task uses more resources than offered. - Resources taskResources = task.resources(); + const Resources& taskResources = task.resources(); if (!(taskResources <= resources)) { return "Task " + stringify(task.task_id()) + " attempted to use " + @@ -1880,14 +1880,42 @@ struct ResourceUsageChecker : TaskInfoVisitor // Check this task's executor's resources. if (task.has_executor()) { - // TODO(benh): Check that the executor uses some resources. - foreach (const Resource& resource, task.executor().resources()) { + const Resources& executorResources = task.executor().resources(); + + foreach (const Resource& resource, executorResources) { if (!Resources::isAllocatable(resource)) { // TODO(benh): Send back the invalid resources? return "Executor for task " + stringify(task.task_id()) + " uses invalid resources " + stringify(resource); } } + + // Check minimal cpus and memory resources of executor + // and log warnings if not set. + // TODO(martin): MESOS-1807. Return Error instead of logging a + // warning in 0.22.0. + Option<double> cpus = executorResources.cpus(); + if (cpus.isNone() || cpus.get() < MIN_CPUS) { + LOG(WARNING) + << "Executor " << stringify(task.executor().executor_id()) + << " for task " << stringify(task.task_id()) + << " uses less CPUs (" + << (cpus.isSome() ? stringify(cpus.get()) : "None") + << ") than the minimum required (" << MIN_CPUS + << "). Please update your executor, as this will be mandatory " + << "in future releases."; + } + Option<Bytes> mem = executorResources.mem(); + if (mem.isNone() || mem.get() < MIN_MEM) { + LOG(WARNING) + << "Executor " << stringify(task.executor().executor_id()) + << " for task " << stringify(task.task_id()) + << " uses less memory (" + << (mem.isSome() ? stringify(mem.get().megabytes()) : "None") + << ") than the minimum required (" << MIN_MEM + << "). Please update your executor, as this will be mandatory " + << "in future releases."; + } } return None(); http://git-wip-us.apache.org/repos/asf/mesos/blob/0bdc7adf/src/tests/allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp index 49e36f9..9cd5da3 100644 --- a/src/tests/allocator_tests.cpp +++ b/src/tests/allocator_tests.cpp @@ -769,10 +769,9 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) allocator.real, &HierarchicalDRFAllocatorProcess::slaveAdded); // This slave's resources will never be offered to anyone, - // because there is no framework with role3 and the unreserved - // memory can't be offered without a cpu to go with it. + // because there is no framework with role3. slave::Flags flags2 = CreateSlaveFlags(); - flags2.resources = Some("cpus(role3):4;mem:1024;disk:0"); + flags2.resources = Some("cpus(role3):4;mem(role3):1024;disk:0"); Try<PID<Slave> > slave2 = StartSlave(flags2); ASSERT_SOME(slave2); @@ -1811,6 +1810,182 @@ TYPED_TEST(AllocatorTest, TaskFinished) } +// Checks that cpus only resources are offered +// and tasks using only cpus are launched. +TYPED_TEST(AllocatorTest, CpusOnlyOfferedAndTaskLaunched) +{ + EXPECT_CALL(this->allocator, initialize(_, _, _)); + + master::Flags masterFlags = this->CreateMasterFlags(); + masterFlags.allocation_interval = Milliseconds(50); + Try<PID<Master> > master = this->StartMaster(&this->allocator, masterFlags); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + // Start a slave with cpus only resources. + slave::Flags flags = this->CreateSlaveFlags(); + flags.resources = Some("cpus:2;mem:0"); + + EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + + Try<PID<Slave> > slave = this->StartSlave(&exec, flags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(this->allocator, frameworkAdded(_, _, _)); + + EXPECT_CALL(sched, registered(_, _, _)); + + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillRepeatedly(DoDefault()); + + // Launch a cpus only task. + EXPECT_CALL(sched, resourceOffers(_, OfferEq(2, 0))) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 0, "*")); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + ExecutorDriver* execDriver; + TaskInfo taskInfo; + Future<Nothing> launchTask; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(DoAll(SaveArg<0>(&execDriver), + SaveArg<1>(&taskInfo), + SendStatusUpdateFromTask(TASK_RUNNING), + FutureSatisfy(&launchTask))); + + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillRepeatedly(DoDefault()); + + driver.start(); + + AWAIT_READY(launchTask); + + TaskStatus status; + status.mutable_task_id()->MergeFrom(taskInfo.task_id()); + status.set_state(TASK_FINISHED); + + // Check that cpus resources of finished task are offered again. + Future<Nothing> resourceOffers; + EXPECT_CALL(sched, resourceOffers(_, OfferEq(2, 0))) + .WillOnce(FutureSatisfy(&resourceOffers)); + + execDriver->sendStatusUpdate(status); + + AWAIT_READY(resourceOffers); + + // Shut everything down. + EXPECT_CALL(this->allocator, frameworkDeactivated(_)) + .Times(AtMost(1)); + + EXPECT_CALL(this->allocator, frameworkRemoved(_)) + .Times(AtMost(1)); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + EXPECT_CALL(this->allocator, slaveRemoved(_)) + .Times(AtMost(1)); + + this->Shutdown(); +} + + +// Checks that memory only resources are offered +// and tasks using only memory are launched. +TYPED_TEST(AllocatorTest, MemoryOnlyOfferedAndTaskLaunched) +{ + EXPECT_CALL(this->allocator, initialize(_, _, _)); + + master::Flags masterFlags = this->CreateMasterFlags(); + masterFlags.allocation_interval = Milliseconds(50); + Try<PID<Master> > master = this->StartMaster(&this->allocator, masterFlags); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + // Start a slave with memory only resources. + slave::Flags flags = this->CreateSlaveFlags(); + flags.resources = Some("cpus:0;mem:200"); + + EXPECT_CALL(this->allocator, slaveAdded(_, _, _)); + + Try<PID<Slave> > slave = this->StartSlave(&exec, flags); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(this->allocator, frameworkAdded(_, _, _)); + + EXPECT_CALL(sched, registered(_, _, _)); + + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillRepeatedly(DoDefault()); + + // Launch a memory only task. + EXPECT_CALL(sched, resourceOffers(_, OfferEq(0, 200))) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 0, 200, "*")); + + EXPECT_CALL(exec, registered(_, _, _, _)); + + ExecutorDriver* execDriver; + TaskInfo taskInfo; + Future<Nothing> launchTask; + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(DoAll(SaveArg<0>(&execDriver), + SaveArg<1>(&taskInfo), + SendStatusUpdateFromTask(TASK_RUNNING), + FutureSatisfy(&launchTask))); + + EXPECT_CALL(sched, statusUpdate(_, _)) + .WillRepeatedly(DoDefault()); + + driver.start(); + + AWAIT_READY(launchTask); + + TaskStatus status; + status.mutable_task_id()->MergeFrom(taskInfo.task_id()); + status.set_state(TASK_FINISHED); + + // Check that mem resources of finished task are offered again. + Future<Nothing> resourceOffers; + EXPECT_CALL(sched, resourceOffers(_, OfferEq(0, 200))) + .WillOnce(FutureSatisfy(&resourceOffers)); + + execDriver->sendStatusUpdate(status); + + AWAIT_READY(resourceOffers); + + // Shut everything down. + EXPECT_CALL(this->allocator, frameworkDeactivated(_)) + .Times(AtMost(1)); + + EXPECT_CALL(this->allocator, frameworkRemoved(_)) + .Times(AtMost(1)); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + EXPECT_CALL(this->allocator, slaveRemoved(_)) + .Times(AtMost(1)); + + this->Shutdown(); +} + + // Checks that a slave that is not whitelisted will not have its // resources get offered, and that if the whitelist is updated so // that it is whitelisted, its resources will then be offered.
