Maintained persisted resources in master memory.

Review: https://reviews.apache.org/r/28781


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cd954386
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cd954386
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cd954386

Branch: refs/heads/master
Commit: cd9543864c6062ebe05714762116adc20ce02bae
Parents: 19f6e17
Author: Jie Yu <[email protected]>
Authored: Wed Jan 28 16:10:26 2015 -0800
Committer: Jie Yu <[email protected]>
Committed: Thu Jan 29 11:43:23 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 34 ++++++++++++++++++++++++++++++++--
 src/master/master.hpp | 12 ++++++++++++
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cd954386/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index bedafaf..1005686 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -557,11 +557,13 @@ void Master::initialize()
   install<RegisterSlaveMessage>(
       &Master::registerSlave,
       &RegisterSlaveMessage::slave,
+      &RegisterSlaveMessage::persisted_resources,
       &RegisterSlaveMessage::version);
 
   install<ReregisterSlaveMessage>(
       &Master::reregisterSlave,
       &ReregisterSlaveMessage::slave,
+      &ReregisterSlaveMessage::persisted_resources,
       &ReregisterSlaveMessage::executor_infos,
       &ReregisterSlaveMessage::tasks,
       &ReregisterSlaveMessage::completed_frameworks,
@@ -3251,6 +3253,7 @@ void Master::schedulerMessage(
 void Master::registerSlave(
     const UPID& from,
     const SlaveInfo& slaveInfo,
+    const vector<Resource>& persistedResources,
     const string& version)
 {
   ++metrics.messages_register_slave;
@@ -3260,7 +3263,12 @@ void Master::registerSlave(
               << " because authentication is still in progress";
 
     authenticating[from]
-      .onReady(defer(self(), &Self::registerSlave, from, slaveInfo, version));
+      .onReady(defer(self(),
+                     &Self::registerSlave,
+                     from,
+                     slaveInfo,
+                     persistedResources,
+                     version));
     return;
   }
 
@@ -3325,6 +3333,7 @@ void Master::registerSlave(
                  &Self::_registerSlave,
                  slaveInfo_,
                  from,
+                 persistedResources,
                  version,
                  lambda::_1));
 }
@@ -3333,6 +3342,7 @@ void Master::registerSlave(
 void Master::_registerSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
+    const vector<Resource>& persistedResources,
     const string& version,
     const Future<bool>& admit)
 {
@@ -3361,7 +3371,8 @@ void Master::_registerSlave(
         slaveInfo,
         pid,
         version.empty() ? Option<string>::none() : version,
-        Clock::now());
+        Clock::now(),
+        persistedResources);
 
     ++metrics.slave_registrations;
 
@@ -3380,6 +3391,7 @@ void Master::_registerSlave(
 void Master::reregisterSlave(
     const UPID& from,
     const SlaveInfo& slaveInfo,
+    const vector<Resource>& persistedResources,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
     const vector<Archive::Framework>& completedFrameworks,
@@ -3396,6 +3408,7 @@ void Master::reregisterSlave(
                      &Self::reregisterSlave,
                      from,
                      slaveInfo,
+                     persistedResources,
                      executorInfos,
                      tasks,
                      completedFrameworks,
@@ -3507,6 +3520,7 @@ void Master::reregisterSlave(
                  &Self::_reregisterSlave,
                  slaveInfo,
                  from,
+                 persistedResources,
                  executorInfos,
                  tasks,
                  completedFrameworks,
@@ -3518,6 +3532,7 @@ void Master::reregisterSlave(
 void Master::_reregisterSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
+    const vector<Resource>& persistedResources,
     const vector<ExecutorInfo>& executorInfos,
     const vector<Task>& tasks,
     const vector<Archive::Framework>& completedFrameworks,
@@ -3549,6 +3564,7 @@ void Master::_reregisterSlave(
         pid,
         version.empty() ? Option<string>::none() : version,
         Clock::now(),
+        persistedResources,
         executorInfos,
         tasks);
 
@@ -3572,6 +3588,8 @@ void Master::_reregisterSlave(
 
 void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
 {
+  CHECK_NOTNULL(slave);
+
   // Send the latest framework pids to the slave.
   hashset<UPID> pids;
   foreach (const Task& task, tasks) {
@@ -3585,6 +3603,18 @@ void Master::__reregisterSlave(Slave* slave, const 
vector<Task>& tasks)
       pids.insert(framework->pid);
     }
   }
+
+  // NOTE: Here we always send the message. Slaves whose version are
+  // less than 0.22.0 will drop it silently which is OK.
+  LOG(INFO) << "Sending updated persisted resources "
+            << slave->persistedResources
+            << " to slave " << *slave;
+
+  UpdateResourcesMessage message;
+  message.mutable_persisted_resources()->CopyFrom(
+      slave->persistedResources);
+
+  send(slave->pid, message);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/cd954386/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1d342e5..337e00a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -163,11 +163,13 @@ public:
   void registerSlave(
       const process::UPID& from,
       const SlaveInfo& slaveInfo,
+      const std::vector<Resource>& persistedResources,
       const std::string& version);
 
   void reregisterSlave(
       const process::UPID& from,
       const SlaveInfo& slaveInfo,
+      const std::vector<Resource>& persistedResources,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
       const std::vector<Archive::Framework>& completedFrameworks,
@@ -232,6 +234,7 @@ public:
   void _reregisterSlave(
       const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      const std::vector<Resource>& persistedResources,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
       const std::vector<Archive::Framework>& completedFrameworks,
@@ -276,6 +279,7 @@ protected:
   void _registerSlave(
       const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      const std::vector<Resource>& persistedResources,
       const std::string& version,
       const process::Future<bool>& admit);
 
@@ -730,6 +734,7 @@ struct Slave
         const process::UPID& _pid,
         const Option<std::string> _version,
         const process::Time& _registeredTime,
+        const Resources& _persistedResources,
         const std::vector<ExecutorInfo> executorInfos =
           std::vector<ExecutorInfo>(),
         const std::vector<Task> tasks =
@@ -741,6 +746,7 @@ struct Slave
       registeredTime(_registeredTime),
       connected(true),
       active(true),
+      persistedResources(_persistedResources),
       observer(NULL)
   {
     CHECK(_info.has_id());
@@ -917,6 +923,12 @@ struct Slave
   hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
   Resources offeredResources; // Offers.
 
+  // Resources that should be persisted by the slave (e.g. persistent
+  // volumes, dynamic reservations, etc). These are either in use by a
+  // task/executor, or are available for use and will be re-offered to
+  // the framework.
+  Resources persistedResources;
+
   SlaveObserver* observer;
 
 private:

Reply via email to