Maintenance Primitives: Added updateUnavailability to master. Review: https://reviews.apache.org/r/37175
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f87f733d Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f87f733d Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f87f733d Branch: refs/heads/master Commit: f87f733dbd34e39c91125fabe541269aea806266 Parents: ea48105 Author: Joris Van Remoortere <[email protected]> Authored: Tue Aug 25 18:40:10 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Mon Sep 14 13:58:37 2015 -0400 ---------------------------------------------------------------------- include/mesos/master/allocator.hpp | 8 +++ src/master/allocator/mesos/allocator.hpp | 21 +++++++ src/master/allocator/mesos/hierarchical.hpp | 29 +++++++++ src/master/http.cpp | 15 ++--- src/master/master.cpp | 77 ++++++++++++++++++++++++ src/master/master.hpp | 4 ++ src/tests/master_maintenance_tests.cpp | 21 ++++--- src/tests/mesos.hpp | 15 +++++ 8 files changed, 174 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/include/mesos/master/allocator.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp index 257d2f6..b5bfc28 100644 --- a/include/mesos/master/allocator.hpp +++ b/include/mesos/master/allocator.hpp @@ -135,6 +135,14 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations) = 0; + // We currently support storing the next unavailability, if there is one, per + // slave. If `unavailability` is not set then there is no known upcoming + // unavailability. This might require the implementation of the function to + // remove any inverse offers that are outstanding. + virtual void updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability) = 0; + // Informs the Allocator to recover resources that are considered // used by the framework. virtual void recoverResources( http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/allocator.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index c845723..ee6ec58 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -108,6 +108,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations); + void updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -199,6 +203,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations) = 0; + virtual void updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability) = 0; + virtual void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -433,6 +441,19 @@ MesosAllocator<AllocatorProcess>::updateAvailable( template <typename AllocatorProcess> +inline void MesosAllocator<AllocatorProcess>::updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability) +{ + return process::dispatch( + process, + &MesosAllocatorProcess::updateUnavailability, + slaveId, + unavailability); +} + + +template <typename AllocatorProcess> inline void MesosAllocator<AllocatorProcess>::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/allocator/mesos/hierarchical.hpp ---------------------------------------------------------------------- diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index f86a701..77a5b4c 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -146,6 +146,10 @@ public: const SlaveID& slaveId, const std::vector<Offer::Operation>& operations); + void updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability); + void recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, @@ -816,6 +820,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateAvailable( template <class RoleSorter, class FrameworkSorter> void +HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability( + const SlaveID& slaveId, + const Option<Unavailability>& unavailability) +{ + CHECK(initialized); + CHECK(slaves.contains(slaveId)); + + // NOTE: We currently implement maintenance in the allocator to be able to + // leverage state and features such as the FrameworkSorter and Filters. + + // Remove any old unavailability. + slaves[slaveId].maintenance = None(); + + // If we have a new unavailability. + if (unavailability.isSome()) { + slaves[slaveId].maintenance = + typename Slave::Maintenance(unavailability.get()); + } + + allocate(slaveId); +} + + +template <class RoleSorter, class FrameworkSorter> +void HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index a814930..05b590e 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -1494,19 +1494,19 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const } } - // NOTE: Copies are needed because this loop modifies the container. + // NOTE: Copies are needed because `updateUnavailability()` in this loop + // modifies the container. foreachkey (const MachineID& id, utils::copy(master->machines)) { // Update the entry for each updated machine. if (updated.contains(id)) { - master->machines[id] - .info.mutable_unavailability()->CopyFrom(updated[id]); - + master->updateUnavailability(id, updated[id]); continue; } - // Remove the unavailability for each removed machine. - master->machines[id].info.clear_unavailability(); + // Transition each removed machine back to the `UP` mode and remove the + // unavailability. master->machines[id].info.set_mode(MachineInfo::UP); + master->updateUnavailability(id, None()); } // Save each new machine, with the unavailability @@ -1516,9 +1516,10 @@ Future<Response> Master::Http::maintenanceSchedule(const Request& request) const MachineInfo info; info.mutable_id()->CopyFrom(id); info.set_mode(MachineInfo::DRAINING); - info.mutable_unavailability()->CopyFrom(window.unavailability()); master->machines[id].info.CopyFrom(info); + + master->updateUnavailability(id, window.unavailability()); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 1bed6a6..0b3ba56 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -50,6 +50,7 @@ #include <process/metrics/metrics.hpp> #include <stout/check.hpp> +#include <stout/duration.hpp> #include <stout/error.hpp> #include <stout/ip.hpp> #include <stout/lambda.hpp> @@ -3846,6 +3847,26 @@ void Master::reregisterSlave( // based authentication). LOG(INFO) << "Re-registering slave " << *slave; + // We don't allow re-registering this way with a different IP or + // hostname. This is because maintenance is scheduled at the + // machine level; so we would need to re-validate the slave's + // unavailability if the machine it is running on changed. + if (slave->pid.address.ip != from.address.ip || + slave->info.hostname() != slaveInfo.hostname()) { + LOG(WARNING) << "Slave " << slaveInfo.id() << " at " << from + << " (" << slaveInfo.hostname() << ") attempted to " + << "re-register with different IP / hostname; expected " + << slave->pid.address.ip << " (" << slave->info.hostname() + << ") shutting it down"; + + ShutdownMessage message; + message.set_message( + "Slave attempted to re-register with different IP / hostname"); + + send(from, message); + return; + } + // Update the slave pid and relink to it. // NOTE: Re-linking the slave here always rather than only when // the slave is disconnected can lead to multiple exited events @@ -4102,6 +4123,62 @@ void Master::updateSlave( } +void Master::updateUnavailability( + const MachineID& machineId, + const Option<Unavailability>& unavailability) +{ + if (unavailability.isSome()) { + machines[machineId].info.mutable_unavailability()->CopyFrom( + unavailability.get()); + } else { + machines[machineId].info.clear_unavailability(); + } + + // TODO(jmlvanre): Only update allocator and rescind offers if the + // unavailability has actually changed. + if (machines.contains(machineId)) { + // For every slave on this machine, update the allocator. + foreach (const SlaveID& slaveId, machines[machineId].slaves) { + // The slave should not be in the machines mapping if it is removed. + CHECK(slaves.removed.get(slaveId).isNone()); + + // The slave should be registered if it is in the machines mapping. + CHECK(slaves.registered.contains(slaveId)); + + Slave* slave = CHECK_NOTNULL(slaves.registered.get(slaveId)); + + if (unavailability.isSome()) { + // TODO(jmlvanre): Add stream operator for unavailability. + LOG(INFO) << "Updating unavailability of slave " << *slave + << ", starting at " + << Nanoseconds(unavailability.get().start().nanoseconds()); + } else { + LOG(INFO) << "Removing unavailability of slave " << *slave; + } + + // Remove and rescind offers since we want to inform frameworks of the + // unavailability change as soon as possible. + foreach (Offer* offer, utils::copy(slave->offers)) { + allocator->recoverResources( + offer->framework_id(), slave->id, offer->resources(), None()); + + removeOffer(offer, true); // Rescind! + } + + // We remove / resind all the offers first so that any calls to the + // allocator to modify its internal state are queued before the update of + // the unavailability in the allocator. We do this so that the allocator's + // state can start from a "clean slate" for the new unavailability. + // NOTE: Any calls from the Allocator back into the master, for example + // `offer()`, are guaranteed to happen after this function exits due to + // the Actor pattern. + + allocator->updateUnavailability(slaveId, unavailability); + } + } +} + + // TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' // because the status updates will be sent by the slave. void Master::statusUpdate(const StatusUpdate& update, const UPID& pid) http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index d7d27bd..cd71a25 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -468,6 +468,10 @@ public: const SlaveID& slaveId, const Resources& oversubscribedResources); + void updateUnavailability( + const MachineID& machineId, + const Option<Unavailability>& unavailability); + void shutdownSlave( const SlaveID& slaveId, const std::string& message); http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/master_maintenance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp index 5811446..a857ab9 100644 --- a/src/tests/master_maintenance_tests.cpp +++ b/src/tests/master_maintenance_tests.cpp @@ -327,6 +327,12 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) .WillOnce(FutureArg<1>(&unavailabilityOffers)) .WillRepeatedly(Return()); // Ignore subsequent offers. + // The original offers should be rescinded when the unavailability + // is changed. + Future<Nothing> offerRescinded; + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .WillOnce(FutureSatisfy(&offerRescinded)); + // Start the test. driver.start(); @@ -337,10 +343,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) // Check that unavailability is not set. foreach (const Offer& offer, normalOffers.get()) { EXPECT_FALSE(offer.has_unavailability()); - - // We have a few seconds between allocations (by default). That should - // be enough time to post a schedule before the next allocation. - driver.declineOffer(offer.id()); } // Schedule this slave for maintenance. @@ -355,9 +357,13 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) const Unavailability unavailability = createUnavailability(start, duration); // Post a valid schedule with one machine. - maintenance::Schedule schedule = createSchedule({ - createWindow({machine}, unavailability)}); + maintenance::Schedule schedule = createSchedule( + {createWindow({machine}, unavailability)}); + // We have a few seconds between the first set of offers and the + // next allocation of offers. This should be enough time to perform + // a maintenance schedule update. This update will also trigger the + // rescinding of offers from the scheduled slave. Future<Response> response = process::http::post( master.get(), "maintenance/schedule", @@ -366,9 +372,6 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); - // Speed up the test by not waiting until the next allocation. - driver.reviveOffers(); - // Wait for some offers. AWAIT_READY(unavailabilityOffers); EXPECT_NE(0u, unavailabilityOffers.get().size()); http://git-wip-us.apache.org/repos/asf/mesos/blob/f87f733d/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 4b65440..477b7e4 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1353,6 +1353,12 @@ ACTION_P(InvokeUpdateResources, allocator) } +ACTION_P(InvokeUpdateUnavailability, allocator) +{ + return allocator->real->updateUnavailability(arg0, arg1); +} + + ACTION_P(InvokeRecoverResources, allocator) { allocator->real->recoverResources(arg0, arg1, arg2, arg3); @@ -1476,6 +1482,11 @@ public: EXPECT_CALL(*this, updateAvailable(_, _)) .WillRepeatedly(DoDefault()); + ON_CALL(*this, updateUnavailability(_, _)) + .WillByDefault(InvokeUpdateUnavailability(this)); + EXPECT_CALL(*this, updateUnavailability(_, _)) + .WillRepeatedly(DoDefault()); + ON_CALL(*this, recoverResources(_, _, _, _)) .WillByDefault(InvokeRecoverResources(this)); EXPECT_CALL(*this, recoverResources(_, _, _, _)) @@ -1550,6 +1561,10 @@ public: const SlaveID&, const std::vector<Offer::Operation>&)); + MOCK_METHOD2(updateUnavailability, void( + const SlaveID&, + const Option<Unavailability>&)); + MOCK_METHOD4(recoverResources, void( const FrameworkID&, const SlaveID&,
