Maintained persisted resources in master memory. Review: https://reviews.apache.org/r/28781
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cd954386 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cd954386 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cd954386 Branch: refs/heads/master Commit: cd9543864c6062ebe05714762116adc20ce02bae Parents: 19f6e17 Author: Jie Yu <[email protected]> Authored: Wed Jan 28 16:10:26 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Thu Jan 29 11:43:23 2015 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 34 ++++++++++++++++++++++++++++++++-- src/master/master.hpp | 12 ++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/cd954386/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index bedafaf..1005686 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -557,11 +557,13 @@ void Master::initialize() install<RegisterSlaveMessage>( &Master::registerSlave, &RegisterSlaveMessage::slave, + &RegisterSlaveMessage::persisted_resources, &RegisterSlaveMessage::version); install<ReregisterSlaveMessage>( &Master::reregisterSlave, &ReregisterSlaveMessage::slave, + &ReregisterSlaveMessage::persisted_resources, &ReregisterSlaveMessage::executor_infos, &ReregisterSlaveMessage::tasks, &ReregisterSlaveMessage::completed_frameworks, @@ -3251,6 +3253,7 @@ void Master::schedulerMessage( void Master::registerSlave( const UPID& from, const SlaveInfo& slaveInfo, + const vector<Resource>& persistedResources, const string& version) { ++metrics.messages_register_slave; @@ -3260,7 +3263,12 @@ void Master::registerSlave( << " because authentication is still in progress"; authenticating[from] - .onReady(defer(self(), &Self::registerSlave, from, slaveInfo, version)); + .onReady(defer(self(), + &Self::registerSlave, + from, + slaveInfo, + persistedResources, + version)); return; } @@ -3325,6 +3333,7 @@ void Master::registerSlave( &Self::_registerSlave, slaveInfo_, from, + persistedResources, version, lambda::_1)); } @@ -3333,6 +3342,7 @@ void Master::registerSlave( void Master::_registerSlave( const SlaveInfo& slaveInfo, const UPID& pid, + const vector<Resource>& persistedResources, const string& version, const Future<bool>& admit) { @@ -3361,7 +3371,8 @@ void Master::_registerSlave( slaveInfo, pid, version.empty() ? Option<string>::none() : version, - Clock::now()); + Clock::now(), + persistedResources); ++metrics.slave_registrations; @@ -3380,6 +3391,7 @@ void Master::_registerSlave( void Master::reregisterSlave( const UPID& from, const SlaveInfo& slaveInfo, + const vector<Resource>& persistedResources, const vector<ExecutorInfo>& executorInfos, const vector<Task>& tasks, const vector<Archive::Framework>& completedFrameworks, @@ -3396,6 +3408,7 @@ void Master::reregisterSlave( &Self::reregisterSlave, from, slaveInfo, + persistedResources, executorInfos, tasks, completedFrameworks, @@ -3507,6 +3520,7 @@ void Master::reregisterSlave( &Self::_reregisterSlave, slaveInfo, from, + persistedResources, executorInfos, tasks, completedFrameworks, @@ -3518,6 +3532,7 @@ void Master::reregisterSlave( void Master::_reregisterSlave( const SlaveInfo& slaveInfo, const UPID& pid, + const vector<Resource>& persistedResources, const vector<ExecutorInfo>& executorInfos, const vector<Task>& tasks, const vector<Archive::Framework>& completedFrameworks, @@ -3549,6 +3564,7 @@ void Master::_reregisterSlave( pid, version.empty() ? Option<string>::none() : version, Clock::now(), + persistedResources, executorInfos, tasks); @@ -3572,6 +3588,8 @@ void Master::_reregisterSlave( void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) { + CHECK_NOTNULL(slave); + // Send the latest framework pids to the slave. hashset<UPID> pids; foreach (const Task& task, tasks) { @@ -3585,6 +3603,18 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks) pids.insert(framework->pid); } } + + // NOTE: Here we always send the message. Slaves whose version are + // less than 0.22.0 will drop it silently which is OK. + LOG(INFO) << "Sending updated persisted resources " + << slave->persistedResources + << " to slave " << *slave; + + UpdateResourcesMessage message; + message.mutable_persisted_resources()->CopyFrom( + slave->persistedResources); + + send(slave->pid, message); } http://git-wip-us.apache.org/repos/asf/mesos/blob/cd954386/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 1d342e5..337e00a 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -163,11 +163,13 @@ public: void registerSlave( const process::UPID& from, const SlaveInfo& slaveInfo, + const std::vector<Resource>& persistedResources, const std::string& version); void reregisterSlave( const process::UPID& from, const SlaveInfo& slaveInfo, + const std::vector<Resource>& persistedResources, const std::vector<ExecutorInfo>& executorInfos, const std::vector<Task>& tasks, const std::vector<Archive::Framework>& completedFrameworks, @@ -232,6 +234,7 @@ public: void _reregisterSlave( const SlaveInfo& slaveInfo, const process::UPID& pid, + const std::vector<Resource>& persistedResources, const std::vector<ExecutorInfo>& executorInfos, const std::vector<Task>& tasks, const std::vector<Archive::Framework>& completedFrameworks, @@ -276,6 +279,7 @@ protected: void _registerSlave( const SlaveInfo& slaveInfo, const process::UPID& pid, + const std::vector<Resource>& persistedResources, const std::string& version, const process::Future<bool>& admit); @@ -730,6 +734,7 @@ struct Slave const process::UPID& _pid, const Option<std::string> _version, const process::Time& _registeredTime, + const Resources& _persistedResources, const std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(), const std::vector<Task> tasks = @@ -741,6 +746,7 @@ struct Slave registeredTime(_registeredTime), connected(true), active(true), + persistedResources(_persistedResources), observer(NULL) { CHECK(_info.has_id()); @@ -917,6 +923,12 @@ struct Slave hashmap<FrameworkID, Resources> usedResources; // Active task / executors. Resources offeredResources; // Offers. + // Resources that should be persisted by the slave (e.g. persistent + // volumes, dynamic reservations, etc). These are either in use by a + // task/executor, or are available for use and will be re-offered to + // the framework. + Resources persistedResources; + SlaveObserver* observer; private:
