Repository: mesos Updated Branches: refs/heads/master 656b0e075 -> 5e94bf097
Merge resourcesRecovered and resourcesUnused. Review: https://reviews.apache.org/r/24331 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5e94bf09 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5e94bf09 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5e94bf09 Branch: refs/heads/master Commit: 5e94bf097f38564fb442b48fc26cd759b90ae784 Parents: 656b0e0 Author: Dominic Hamon <[email protected]> Authored: Tue Aug 5 11:09:56 2014 -0700 Committer: Dominic Hamon <[email protected]> Committed: Wed Aug 6 12:20:52 2014 -0700 ---------------------------------------------------------------------- docs/allocation-module.md | 16 +-- docs/mesos-c++-style-guide.md | 18 ++-- src/master/allocator.hpp | 39 ++----- src/master/hierarchical_allocator_process.hpp | 118 +++++++-------------- src/master/master.cpp | 65 +++++++----- src/tests/allocator_tests.cpp | 100 ++++++++--------- src/tests/master_authorization_tests.cpp | 6 +- src/tests/mesos.hpp | 34 ++---- src/tests/slave_recovery_tests.cpp | 4 +- 9 files changed, 166 insertions(+), 234 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/docs/allocation-module.md ---------------------------------------------------------------------- diff --git a/docs/allocation-module.md b/docs/allocation-module.md index b308a23..bca54b0 100644 --- a/docs/allocation-module.md +++ b/docs/allocation-module.md @@ -48,21 +48,15 @@ Mesos is implemented in C++, so allocation modules are implemented in C++, and i const FrameworkID& frameworkId, const std::vector<Request>& requests) = 0; - // Whenever resources offered to a framework go unused (e.g., - // refused) the master invokes this callback. - virtual void resourcesUnused( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources, - const Option<Filters>& filters) = 0; - // Whenever resources are "recovered" in the cluster (e.g., a task // finishes, an offer is removed because a framework has failed or - // is failing over) the master invokes this callback. + // is failing over), or a framework refuses them, the master + // invokes this callback. virtual void resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, - const Resources& resources) = 0; + const Resources& resources, + const Option<Filters>& filters) = 0; // Whenever a framework that has filtered resources wants to revive // offers for those resources the master invokes this callback. @@ -129,4 +123,4 @@ Sorters are implemented in C++ and inherit the @Sorter@ class defined in @MESOS_ The default @Sorter@ is the DRFSorter, which implements fair sharing and can be found at @MESOS_HOME/src/master/drf_sorter.hpp@. -For DRF, if weights are specified in Sorter::add, a client's share will be divided by the weight, creating a form of priority. For example, a role that has a weight of 2 will be offered twice as many resources as a role with weight 1. \ No newline at end of file +For DRF, if weights are specified in Sorter::add, a client's share will be divided by the weight, creating a form of priority. For example, a role that has a weight of 2 will be offered twice as many resources as a role with weight 1. http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/docs/mesos-c++-style-guide.md ---------------------------------------------------------------------- diff --git a/docs/mesos-c++-style-guide.md b/docs/mesos-c++-style-guide.md index 35a49c7..a5f8df8 100644 --- a/docs/mesos-c++-style-guide.md +++ b/docs/mesos-c++-style-guide.md @@ -30,17 +30,17 @@ The Mesos codebase follows the [Google C++ Style Guide](http://google-styleguide <pre> // 1: OK. -allocator->resourcesUnused(frameworkId, slaveId, resources, filters); +allocator->resourcesRecovered(frameworkId, slaveId, resources, filters); // 2: Don't use. -allocator->resourcesUnused(frameworkId, slaveId, +allocator->resourcesRecovered(frameworkId, slaveId, resources, filters); // 3: Don't use in this case due to "jaggedness". -allocator->resourcesUnused(frameworkId, - slaveId, - resources, - filters); +allocator->resourcesRecovered(frameworkId, + slaveId, + resources, + filters); // 3: In this case, 3 is OK. foobar(someArgument, @@ -48,14 +48,14 @@ foobar(someArgument, theLastArgument); // 4: OK. -allocator->resourcesUnused( +allocator->resourcesRecovered( frameworkId, slaveId, resources, filters); // 5: OK. -allocator->resourcesUnused( +allocator->resourcesRecovered( frameworkId, slaveId, resources, filters); </pre> @@ -69,4 +69,4 @@ Try<Duration> failoverTimeout = ## New Lines * 1 blank line at the end of the file. * Elements outside classes (classes, structs, global functions, etc.) should be spaced apart by 2 blank lines. -* Elements inside classes (member variables and functions) should not be spaced apart by more than 1 blank line. \ No newline at end of file +* Elements inside classes (member variables and functions) should not be spaced apart by more than 1 blank line. http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator.hpp b/src/master/allocator.hpp index be6b038..02d20d0 100644 --- a/src/master/allocator.hpp +++ b/src/master/allocator.hpp @@ -103,21 +103,15 @@ public: const FrameworkID& frameworkId, const std::vector<Request>& requests) = 0; - // Whenever resources offered to a framework go unused (e.g., - // refused) the master invokes this callback. - virtual void resourcesUnused( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources, - const Option<Filters>& filters) = 0; - // Whenever resources are "recovered" in the cluster (e.g., a task // finishes, an offer is removed because a framework has failed or - // is failing over) the master invokes this callback. + // is failing over), or a framework refuses them, the master + // invokes this callback. virtual void resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, - const Resources& resources) = 0; + const Resources& resources, + const Option<Filters>& filters) = 0; // Whenever a framework that has filtered resources wants to revive // offers for those resources the master invokes this callback. @@ -180,17 +174,12 @@ public: const FrameworkID& frameworkId, const std::vector<Request>& requests); - void resourcesUnused( + void resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, const Option<Filters>& filters); - void resourcesRecovered( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources); - void offersRevived( const FrameworkID& frameworkId); @@ -339,7 +328,7 @@ inline void Allocator::resourcesRequested( } -inline void Allocator::resourcesUnused( +inline void Allocator::resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, @@ -347,7 +336,7 @@ inline void Allocator::resourcesUnused( { process::dispatch( process, - &AllocatorProcess::resourcesUnused, + &AllocatorProcess::resourcesRecovered, frameworkId, slaveId, resources, @@ -355,20 +344,6 @@ inline void Allocator::resourcesUnused( } -inline void Allocator::resourcesRecovered( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources) -{ - process::dispatch( - process, - &AllocatorProcess::resourcesRecovered, - frameworkId, - slaveId, - resources); -} - - inline void Allocator::offersRevived( const FrameworkID& frameworkId) { http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/master/hierarchical_allocator_process.hpp ---------------------------------------------------------------------- diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp index 35d1579..c7e689e 100644 --- a/src/master/hierarchical_allocator_process.hpp +++ b/src/master/hierarchical_allocator_process.hpp @@ -158,17 +158,12 @@ public: const FrameworkID& frameworkId, const std::vector<Request>& requests); - void resourcesUnused( + void resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, const Option<Filters>& filters); - void resourcesRecovered( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources); - void offersRevived( const FrameworkID& frameworkId); @@ -531,7 +526,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRequested( template <class RoleSorter, class FrameworkSorter> void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused( +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRecovered( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, @@ -543,76 +538,6 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesUnused( return; } - VLOG(1) << "Framework " << frameworkId - << " left " << resources.allocatable() - << " unused on slave " << slaveId; - - // Update resources allocated to framework. It is - // not possible for the role to not be in roles - // because resourcesUnused is only called as the - // result of a valid task launch by an active - // framework that doesn't use the entire offer. - CHECK(frameworks.contains(frameworkId)); - const std::string& role = frameworks[frameworkId].role(); - sorters[role]->unallocated(frameworkId.value(), resources); - sorters[role]->remove(resources); - roleSorter->unallocated(role, resources); - - // Update resources allocatable on slave. - CHECK(slaves.contains(slaveId)); - slaves[slaveId].available += resources; - - // Create a refused resources filter. - Try<Duration> seconds_ = Duration::create(Filters().refuse_seconds()); - CHECK_SOME(seconds_); - Duration seconds = seconds_.get(); - - // Update the value of 'seconds' if the input isSome() and is - // valid. - if (filters.isSome()) { - seconds_ = Duration::create(filters.get().refuse_seconds()); - if (seconds_.isError()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused resources filter because the input value is " - << "invalid: " << seconds_.error(); - } else if (seconds_.get() < Duration::zero()) { - LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " - << "the refused resources filter because the input value is " - << "negative"; - } else { - seconds = seconds_.get(); - } - } - - if (seconds != Duration::zero()) { - LOG(INFO) << "Framework " << frameworkId - << " filtered slave " << slaveId - << " for " << seconds; - - // Create a new filter and delay it's expiration. - Filter* filter = - new RefusedFilter(slaveId, resources, process::Timeout::in(seconds)); - - frameworks[frameworkId].filters.insert(filter); - - delay(seconds, self(), &Self::expire, frameworkId, filter); - } -} - - -template <class RoleSorter, class FrameworkSorter> -void -HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRecovered( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& resources) -{ - CHECK(initialized); - - if (resources.allocatable().size() == 0) { - return; - } - // Updated resources allocated to framework (if framework still // exists, which it might not in the event that we dispatched // Master::offer before we received AllocatorProcess::frameworkRemoved @@ -637,6 +562,45 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRecovered( << ") on slave " << slaveId << " from framework " << frameworkId; } + + // Create a filter for this slave/framework pair if both exist. + if (frameworks.contains(frameworkId) && slaves.contains(slaveId)) { + // Create a refused resources filter. + Try<Duration> seconds_ = Duration::create(Filters().refuse_seconds()); + CHECK_SOME(seconds_); + Duration seconds = seconds_.get(); + + // Update the value of 'seconds' if the input isSome() and is + // valid. + if (filters.isSome()) { + seconds_ = Duration::create(filters.get().refuse_seconds()); + if (seconds_.isError()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused resources filter because the input value is " + << "invalid: " << seconds_.error(); + } else if (seconds_.get() < Duration::zero()) { + LOG(WARNING) << "Using the default value of 'refuse_seconds' to create " + << "the refused resources filter because the input value is " + << "negative"; + } else { + seconds = seconds_.get(); + } + } + + if (seconds != Duration::zero()) { + VLOG(1) << "Framework " << frameworkId + << " filtered slave " << slaveId + << " for " << seconds; + + // Create a new filter and delay its expiration. + Filter* filter = + new RefusedFilter(slaveId, resources, process::Timeout::in(seconds)); + + frameworks[frameworkId].filters.insert(filter); + + delay(seconds, self(), &Self::expire, frameworkId, filter); + } + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index c44896e..a925a93 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1460,7 +1460,10 @@ void Master::_reregisterFramework( // those messages since it wasn't connected to the master. foreach (Offer* offer, utils::copy(framework->offers)) { allocator->resourcesRecovered( - offer->framework_id(), offer->slave_id(), offer->resources()); + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); removeOffer(offer, true); // Rescind. } @@ -1595,7 +1598,7 @@ void Master::deactivate(Framework* framework) // Remove the framework's offers. foreach (Offer* offer, utils::copy(framework->offers)) { allocator->resourcesRecovered( - offer->framework_id(), offer->slave_id(), offer->resources()); + offer->framework_id(), offer->slave_id(), offer->resources(), None()); removeOffer(offer, true); // Rescind. } } @@ -1618,7 +1621,7 @@ void Master::disconnect(Slave* slave) // Remove and rescind offers. foreach (Offer* offer, utils::copy(slave->offers)) { allocator->resourcesRecovered( - offer->framework_id(), slave->id, offer->resources()); + offer->framework_id(), slave->id, offer->resources(), None()); removeOffer(offer, true); // Rescind! } @@ -2095,7 +2098,10 @@ void Master::launchTasks( if (offer != NULL) { if (error.isSome()) { allocator->resourcesRecovered( - offer->framework_id(), offer->slave_id(), offer->resources()); + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); } removeOffer(offer); } @@ -2139,7 +2145,7 @@ void Master::launchTasks( // Wait for all the tasks to be validated. // NOTE: We wait for all tasks because currently the allocator - // is expected to get 'resourcesUnused()' once per 'launchTasks()'. + // is expected to get 'resourcesRecovered()' once per 'launchTasks()'. await(futures) .onAny(defer(self(), &Master::_launchTasks, @@ -2309,7 +2315,7 @@ void Master::_launchTasks( << " because the framework cannot be found"; // Tell the allocator about the recovered resources. - allocator->resourcesRecovered(frameworkId, slaveId, totalResources); + allocator->resourcesRecovered(frameworkId, slaveId, totalResources, None()); return; } @@ -2328,7 +2334,7 @@ void Master::_launchTasks( } // Tell the allocator about the recovered resources. - allocator->resourcesRecovered(frameworkId, slaveId, totalResources); + allocator->resourcesRecovered(frameworkId, slaveId, totalResources, None()); return; } @@ -2404,7 +2410,7 @@ void Master::_launchTasks( if (unusedResources.allocatable().size() > 0) { // Tell the allocator about the unused (e.g., refused) resources. - allocator->resourcesUnused(frameworkId, slaveId, unusedResources, filters); + allocator->resourcesRecovered(frameworkId, slaveId, unusedResources, filters); } } @@ -3182,7 +3188,7 @@ void Master::exitedExecutor( << WSTRINGIFY(status); allocator->resourcesRecovered( - frameworkId, slaveId, Resources(executor.resources())); + frameworkId, slaveId, Resources(executor.resources()), None()); // Remove executor from slave and framework. slave->removeExecutor(frameworkId, executorId); @@ -3381,7 +3387,7 @@ void Master::offer(const FrameworkID& frameworkId, << " has terminated or is inactive"; foreachpair (const SlaveID& slaveId, const Resources& offered, resources) { - allocator->resourcesRecovered(frameworkId, slaveId, offered); + allocator->resourcesRecovered(frameworkId, slaveId, offered, None()); } return; } @@ -3396,7 +3402,7 @@ void Master::offer(const FrameworkID& frameworkId, << frameworkId << " because slave " << slaveId << " is not valid"; - allocator->resourcesRecovered(frameworkId, slaveId, offered); + allocator->resourcesRecovered(frameworkId, slaveId, offered, None()); continue; } @@ -3412,7 +3418,7 @@ void Master::offer(const FrameworkID& frameworkId, LOG(WARNING) << "Master returning resources offered because slave " << *slave << " is disconnected"; - allocator->resourcesRecovered(frameworkId, slaveId, offered); + allocator->resourcesRecovered(frameworkId, slaveId, offered, None()); continue; } @@ -3432,11 +3438,13 @@ void Master::offer(const FrameworkID& frameworkId, LOG(WARNING) << "Master returning resources offered because slave " << *slave << " has reached the maximum number of " << "executors"; - allocator->resourcesRecovered(frameworkId, slaveId, offered); + // Pass a default filter to avoid getting this same offer immediately + // from the allocator. + allocator->resourcesRecovered(frameworkId, slaveId, offered, Filters()); continue; } } -#endif // WITH_NETWORK_ISOLATOR +#endif // WITH_NETWORK_ISOLATOR Offer* offer = new Offer(); offer->mutable_id()->MergeFrom(newOfferId()); @@ -3687,7 +3695,8 @@ void Master::reconcile( allocator->resourcesRecovered( frameworkId, slave->id, - slave->executors[frameworkId][executorId].resources()); + slave->executors[frameworkId][executorId].resources(), + None()); slave->removeExecutor(frameworkId, executorId); @@ -3839,7 +3848,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) // these resources to this framework if it wants. foreach (Offer* offer, utils::copy(framework->offers)) { allocator->resourcesRecovered( - offer->framework_id(), offer->slave_id(), offer->resources()); + offer->framework_id(), offer->slave_id(), offer->resources(), None()); removeOffer(offer); } @@ -3891,7 +3900,8 @@ void Master::removeFramework(Framework* framework) foreach (Offer* offer, utils::copy(framework->offers)) { allocator->resourcesRecovered(offer->framework_id(), offer->slave_id(), - Resources(offer->resources())); + Resources(offer->resources()), + None()); removeOffer(offer); } @@ -3902,9 +3912,11 @@ void Master::removeFramework(Framework* framework) foreachpair (const ExecutorID& executorId, const ExecutorInfo& executorInfo, framework->executors[slaveId]) { - allocator->resourcesRecovered(framework->id, - slave->id, - executorInfo.resources()); + allocator->resourcesRecovered( + framework->id, + slave->id, + executorInfo.resources(), + None()); slave->removeExecutor(framework->id, executorId); } } @@ -3989,7 +4001,8 @@ void Master::removeFramework(Slave* slave, Framework* framework) allocator->resourcesRecovered( framework->id, slave->id, - slave->executors[framework->id][executorId].resources()); + slave->executors[framework->id][executorId].resources(), + None()); framework->removeExecutor(slave->id, executorId); slave->removeExecutor(framework->id, executorId); @@ -4168,7 +4181,7 @@ void Master::removeSlave(Slave* slave) // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered' // once MESOS-621 is fixed. allocator->resourcesRecovered( - offer->framework_id(), slave->id, offer->resources()); + offer->framework_id(), slave->id, offer->resources(), None()); // Remove and rescind offers. removeOffer(offer, true); // Rescind! @@ -4184,7 +4197,8 @@ void Master::removeSlave(Slave* slave) allocator->resourcesRecovered( frameworkId, slave->id, - slave->executors[frameworkId][executorId].resources()); + slave->executors[frameworkId][executorId].resources(), + None()); framework->removeExecutor(slave->id, executorId); } @@ -4282,7 +4296,10 @@ void Master::removeTask(Task* task) // Tell the allocator about the recovered resources. allocator->resourcesRecovered( - task->framework_id(), task->slave_id(), Resources(task->resources())); + task->framework_id(), + task->slave_id(), + Resources(task->resources()), + None()); // Update the task state metric. switch (task->state()) { http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/tests/allocator_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/allocator_tests.cpp b/src/tests/allocator_tests.cpp index b920533..9ff4dfd 100644 --- a/src/tests/allocator_tests.cpp +++ b/src/tests/allocator_tests.cpp @@ -294,7 +294,7 @@ TEST_F(DRFAllocatorTest, DRFAllocatorProcess) EXPECT_THAT(offers5.get(), OfferEq(1, 512)); // Shut everything down. - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator, frameworkDeactivated(_)) @@ -397,7 +397,7 @@ TEST_F(DRFAllocatorTest, SameShareAllocations) .WillRepeatedly(DoAll(Increment(&allocations2), DeclineOffers(filters))); - EXPECT_CALL(allocator, resourcesUnused(_, _, _, _)) + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); // Start the slave. @@ -565,7 +565,7 @@ TEST_F(ReservationAllocatorTest, ReservedResources) AWAIT_READY(resourceOffers4); // Shut everything down. - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator, frameworkDeactivated(_)) @@ -652,8 +652,8 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 400))) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 100, "role1")); - EXPECT_CALL(allocator, resourcesUnused(_, _, _, _)) - .WillOnce(InvokeUnusedWithFilters(&allocator, 0)); + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(InvokeResourcesRecoveredWithFilters(&allocator, 0)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -710,7 +710,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) status.mutable_task_id()->MergeFrom(taskInfo.task_id()); status.set_state(TASK_FINISHED); - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)); + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)); // After the task finishes, its resources should be reoffered to // framework1. @@ -723,7 +723,7 @@ TEST_F(ReservationAllocatorTest, ResourcesReturned) AWAIT_READY(resourceOffers3); // Shut everything down. - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator, frameworkDeactivated(_)) @@ -811,7 +811,7 @@ TYPED_TEST(AllocatorTest, MockAllocator) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -871,10 +871,10 @@ TYPED_TEST(AllocatorTest, ResourcesUnused) EXPECT_CALL(sched1, resourceOffers(_, OfferEq(2, 1024))) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")); - Future<Nothing> resourcesUnused; - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)) - .WillOnce(DoAll(InvokeResourcesUnused(&this->allocator), - FutureSatisfy(&resourcesUnused))); + Future<Nothing> resourcesRecovered; + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(DoAll(InvokeResourcesRecovered(&this->allocator), + FutureSatisfy(&resourcesRecovered))); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -889,7 +889,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnused) // We need to wait until the allocator knows about the unused // resources to start the second framework so that we get the // expected offer. - AWAIT_READY(resourcesUnused); + AWAIT_READY(resourcesRecovered); FrameworkInfo frameworkInfo2; // Bug in gcc 4.1.*, must assign on next line. frameworkInfo2 = DEFAULT_FRAMEWORK_INFO; @@ -915,7 +915,7 @@ TYPED_TEST(AllocatorTest, ResourcesUnused) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -994,7 +994,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch) FrameworkID frameworkId; SlaveID slaveId; Resources savedResources; - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) // "Catches" the resourcesRecovered call from the master, so // that it doesn't get processed until we redispatch it after // the frameworkRemoved trigger. @@ -1014,13 +1014,13 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch) AWAIT_READY(frameworkRemoved); - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillOnce(DoDefault()); // Re-dispatch the resourcesRecovered call which we "caught" // earlier now that the framework has been removed, to test // that recovering resources from a removed framework works. - this->a->resourcesRecovered(frameworkId, slaveId, savedResources); + this->a->resourcesRecovered(frameworkId, slaveId, savedResources, None()); // TODO(benh): Seems like we should wait for the above // resourcesRecovered to be executed. @@ -1051,7 +1051,7 @@ TYPED_TEST(AllocatorTest, OutOfOrderDispatch) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1120,10 +1120,10 @@ TYPED_TEST(AllocatorTest, SchedulerFailover) // We don't filter the unused resources to make sure that // they get offered to the framework as soon as it fails over. - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)) - .WillOnce(InvokeUnusedWithFilters(&this->allocator, 0)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(InvokeResourcesRecoveredWithFilters(&this->allocator, 0)) // For subsequent offers. - .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0)); + .WillRepeatedly(InvokeResourcesRecoveredWithFilters(&this->allocator, 0)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1177,7 +1177,7 @@ TYPED_TEST(AllocatorTest, SchedulerFailover) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1255,10 +1255,10 @@ TYPED_TEST(AllocatorTest, FrameworkExited) .WillOnce(LaunchTasks(executor1, 1, 2, 512, "*")); // The framework does not use all the resources. - Future<Nothing> resourcesUnused; - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)) - .WillOnce(DoAll(InvokeResourcesUnused(&this->allocator), - FutureSatisfy(&resourcesUnused))); + Future<Nothing> resourcesRecovered; + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(DoAll(InvokeResourcesRecovered(&this->allocator), + FutureSatisfy(&resourcesRecovered))); EXPECT_CALL(exec1, registered(_, _, _, _)); @@ -1276,7 +1276,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited) // We need to wait until the allocator knows about the unused // resources to start the second framework so that we get the // expected offer. - AWAIT_READY(resourcesUnused); + AWAIT_READY(resourcesRecovered); MockScheduler sched2; MesosSchedulerDriver driver2( @@ -1298,7 +1298,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited) EXPECT_CALL(sched2, resourceOffers(_, OfferEq(1, 512))) .WillOnce(LaunchTasks(executor2, 1, 1, 256, "*")); - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)); EXPECT_CALL(exec2, registered(_, _, _, _)); @@ -1311,7 +1311,7 @@ TYPED_TEST(AllocatorTest, FrameworkExited) // Shut everything down but check that framework 2 gets the // resources from framework 1 after it is shutdown. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1381,7 +1381,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) EXPECT_CALL(sched, resourceOffers(_, OfferEq(2, 1024))) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 512, "*")); - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1401,7 +1401,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) // is killed). AWAIT_READY(launchTask); - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .Times(2); Future<Nothing> slaveRemoved; @@ -1439,7 +1439,7 @@ TYPED_TEST(AllocatorTest, SlaveLost) Resources::parse(flags2.resources.get()).get()); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1503,9 +1503,9 @@ TYPED_TEST(AllocatorTest, SlaveAdded) // on slave1 from the task launch won't get reoffered // immediately and will get combined with slave2's // resources for a single offer. - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)) - .WillOnce(InvokeUnusedWithFilters(&this->allocator, 0.1)) - .WillRepeatedly(InvokeUnusedWithFilters(&this->allocator, 0)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillOnce(InvokeResourcesRecoveredWithFilters(&this->allocator, 0.1)) + .WillRepeatedly(InvokeResourcesRecoveredWithFilters(&this->allocator, 0)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1538,7 +1538,7 @@ TYPED_TEST(AllocatorTest, SlaveAdded) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1604,10 +1604,10 @@ TYPED_TEST(AllocatorTest, TaskFinished) // don't send the TASK_FINISHED status update below until after the // allocator knows about the unused resources so that it can // aggregate them with the resources from the finished task. - Future<Nothing> resourcesUnused; - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)) - .WillRepeatedly(DoAll(InvokeResourcesUnused(&this->allocator), - FutureSatisfy(&resourcesUnused))); + Future<Nothing> resourcesRecovered; + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) + .WillRepeatedly(DoAll(InvokeResourcesRecovered(&this->allocator), + FutureSatisfy(&resourcesRecovered))); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1628,13 +1628,13 @@ TYPED_TEST(AllocatorTest, TaskFinished) AWAIT_READY(launchTask); - AWAIT_READY(resourcesUnused); + AWAIT_READY(resourcesRecovered); TaskStatus status; status.mutable_task_id()->MergeFrom(taskInfo.task_id()); status.set_state(TASK_FINISHED); - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)); // After the first task gets killed. Future<Nothing> resourceOffers; @@ -1646,7 +1646,7 @@ TYPED_TEST(AllocatorTest, TaskFinished) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1748,7 +1748,7 @@ TYPED_TEST(AllocatorTest, WhitelistSlave) Clock::resume(); // Shut everything down. - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(this->allocator, frameworkDeactivated(_)) @@ -1885,7 +1885,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 500, "*")) .WillRepeatedly(DeclineOffers()); - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1909,7 +1909,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) // it doesn't try to retry the update after master failover. AWAIT_READY(_statusUpdateAcknowledgement); - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); this->ShutdownMasters(); @@ -1951,7 +1951,7 @@ TYPED_TEST(AllocatorTest, FrameworkReregistersFirst) EXPECT_THAT(resourceOffers2.get(), OfferEq(1, 524)); // Shut everything down. - EXPECT_CALL(allocator2, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator2, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator2, frameworkDeactivated(_)) @@ -2001,7 +2001,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) EXPECT_CALL(this->allocator, frameworkAdded(_, _, _)); - EXPECT_CALL(this->allocator, resourcesUnused(_, _, _, _)); + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)); EXPECT_CALL(sched, registered(&driver, _, _)); @@ -2033,7 +2033,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) // it doesn't try to retry the update after master failover. AWAIT_READY(_statusUpdateAcknowledgement); - EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(this->allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); this->ShutdownMasters(); @@ -2075,7 +2075,7 @@ TYPED_TEST(AllocatorTest, SlaveReregistersFirst) EXPECT_THAT(resourceOffers2.get(), OfferEq(1, 524)); // Shut everything down. - EXPECT_CALL(allocator2, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator2, resourcesRecovered(_, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator2, frameworkDeactivated(_)) http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index 009afa4..5c35577 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -271,15 +271,15 @@ TEST_F(MasterAuthorizationTest, KillTask) AWAIT_READY(status); EXPECT_EQ(TASK_KILLED, status.get().state()); - Future<Nothing> resourcesUnused = - FUTURE_DISPATCH(_, &AllocatorProcess::resourcesUnused); + Future<Nothing> resourcesRecovered = + FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered); // Now complete authorization. promise.set(true); // No task launch should happen resulting in all resources being // returned to the allocator. - AWAIT_READY(resourcesUnused); + AWAIT_READY(resourcesRecovered); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 75c66fd..8cf71d1 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -560,10 +560,7 @@ public: ON_CALL(*this, resourcesRequested(_, _)) .WillByDefault(InvokeResourcesRequested(this)); - ON_CALL(*this, resourcesUnused(_, _, _, _)) - .WillByDefault(InvokeResourcesUnused(this)); - - ON_CALL(*this, resourcesRecovered(_, _, _)) + ON_CALL(*this, resourcesRecovered(_, _, _, _)) .WillByDefault(InvokeResourcesRecovered(this)); ON_CALL(*this, offersRevived(_)) @@ -595,13 +592,10 @@ public: MOCK_METHOD1(updateWhitelist, void(const Option<hashset<std::string> >&)); MOCK_METHOD2(resourcesRequested, void(const FrameworkID&, const std::vector<Request>&)); - MOCK_METHOD4(resourcesUnused, void(const FrameworkID&, - const SlaveID&, - const Resources&, - const Option<Filters>& filters)); - MOCK_METHOD3(resourcesRecovered, void(const FrameworkID&, + MOCK_METHOD4(resourcesRecovered, void(const FrameworkID&, const SlaveID&, - const Resources&)); + const Resources&, + const Option<Filters>& filters)); MOCK_METHOD1(offersRevived, void(const FrameworkID&)); T real; @@ -724,12 +718,11 @@ ACTION_P(InvokeResourcesRequested, allocator) } - -ACTION_P(InvokeResourcesUnused, allocator) +ACTION_P(InvokeResourcesRecovered, allocator) { process::dispatch( allocator->real, - &master::allocator::AllocatorProcess::resourcesUnused, + &master::allocator::AllocatorProcess::resourcesRecovered, arg0, arg1, arg2, @@ -737,14 +730,14 @@ ACTION_P(InvokeResourcesUnused, allocator) } -ACTION_P2(InvokeUnusedWithFilters, allocator, timeout) +ACTION_P2(InvokeResourcesRecoveredWithFilters, allocator, timeout) { Filters filters; filters.set_refuse_seconds(timeout); process::dispatch( allocator->real, - &master::allocator::AllocatorProcess::resourcesUnused, + &master::allocator::AllocatorProcess::resourcesRecovered, arg0, arg1, arg2, @@ -752,17 +745,6 @@ ACTION_P2(InvokeUnusedWithFilters, allocator, timeout) } -ACTION_P(InvokeResourcesRecovered, allocator) -{ - process::dispatch( - allocator->real, - &master::allocator::AllocatorProcess::resourcesRecovered, - arg0, - arg1, - arg2); -} - - ACTION_P(InvokeOffersRevived, allocator) { process::dispatch( http://git-wip-us.apache.org/repos/asf/mesos/blob/5e94bf09/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 8d48aed..b53353c 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -2297,7 +2297,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); EXPECT_CALL(allocator, slaveActivated(_)); - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)); + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)); Future<TaskStatus> status; EXPECT_CALL(sched, statusUpdate(_, _)) @@ -2348,7 +2348,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) // If there was an outstanding offer, we can get a call to // resourcesRecovered when we stop the scheduler. - EXPECT_CALL(allocator, resourcesRecovered(_, _, _)) + EXPECT_CALL(allocator, resourcesRecovered(_, _, _, _)) .WillRepeatedly(Return()); driver.stop();
