Removed the need for Master::readdSlave. Review: https://reviews.apache.org/r/26204
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4a0d158 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4a0d158 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4a0d158 Branch: refs/heads/master Commit: a4a0d1580815360083a46b43d458c5babd58c632 Parents: ee4f879 Author: Benjamin Mahler <[email protected]> Authored: Mon Sep 29 14:07:40 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Oct 8 11:45:12 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 105 +++++++++++++++++++-------------------------- src/master/master.hpp | 24 ++++++++--- 2 files changed, 60 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e445c86..26cd29a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2993,11 +2993,16 @@ void Master::_registerSlave( version.empty() ? Option<string>::none() : version, Clock::now()); - LOG(INFO) << "Registered slave " << *slave - << " with " << slave->info.resources(); ++metrics.slave_registrations; addSlave(slave); + + SlaveRegisteredMessage message; + message.mutable_slave_id()->MergeFrom(slave->id); + send(slave->pid, message); + + LOG(INFO) << "Registered slave " << *slave + << " with " << slave->info.resources(); } } @@ -3188,15 +3193,22 @@ void Master::_reregisterSlave( slaveInfo, pid, version.empty() ? Option<string>::none() : version, - Clock::now()); + Clock::now(), + executorInfos, + tasks); slave->reregisteredTime = Clock::now(); - LOG(INFO) << "Re-registered slave " << *slave - << " with " << slave->info.resources(); ++metrics.slave_reregistrations; - readdSlave(slave, executorInfos, tasks, completedFrameworks); + addSlave(slave, completedFrameworks); + + SlaveReregisteredMessage message; + message.mutable_slave_id()->MergeFrom(slave->id); + send(slave->pid, message); + + LOG(INFO) << "Re-registered slave " << *slave + << " with " << slave->info.resources(); __reregisterSlave(slave, tasks); } @@ -4214,7 +4226,9 @@ void Master::removeFramework(Slave* slave, Framework* framework) } -void Master::addSlave(Slave* slave, bool reregister) +void Master::addSlave( + Slave* slave, + const vector<Archive::Framework>& completedFrameworks) { CHECK_NOTNULL(slave); @@ -4223,70 +4237,37 @@ void Master::addSlave(Slave* slave, bool reregister) link(slave->pid); - if (!reregister) { - SlaveRegisteredMessage message; - message.mutable_slave_id()->MergeFrom(slave->id); - send(slave->pid, message); - } else { - SlaveReregisteredMessage message; - message.mutable_slave_id()->MergeFrom(slave->id); - send(slave->pid, message); - } - // Set up an observer for the slave. slave->observer = new SlaveObserver( slave->pid, slave->info, slave->id, self()); spawn(slave->observer); - if (!reregister) { - allocator->slaveAdded(slave->id, slave->info, slave->usedResources); - } -} - - -void Master::readdSlave( - Slave* slave, - const vector<ExecutorInfo>& executorInfos, - const vector<Task>& tasks, - const vector<Archive::Framework>& completedFrameworks) -{ - CHECK_NOTNULL(slave); - - addSlave(slave, true); - - foreach (const ExecutorInfo& executorInfo, executorInfos) { - // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler - // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs - // to set it, and we could remove this CHECK if desired. - CHECK(executorInfo.has_framework_id()) - << "Executor " << executorInfo.executor_id() - << " doesn't have frameworkId set"; - - slave->addExecutor(executorInfo.framework_id(), executorInfo); - - Framework* framework = getFramework(executorInfo.framework_id()); - if (framework != NULL) { // The framework might not be re-registered yet. - framework->addExecutor(slave->id, executorInfo); + // Add the slave's executors to the frameworks. + foreachkey (const FrameworkID& frameworkId, slave->executors) { + foreachvalue (const ExecutorInfo& executorInfo, + slave->executors[frameworkId]) { + Framework* framework = getFramework(frameworkId); + if (framework != NULL) { // The framework might not be re-registered yet. + framework->addExecutor(slave->id, executorInfo); + } } } - foreach (const Task& task, tasks) { - Task* t = new Task(task); - - // Add the task to the slave. - slave->addTask(t); - - Framework* framework = getFramework(task.framework_id()); - if (framework != NULL) { // The framework might not be re-registered yet. - framework->addTask(t); - } else { - // TODO(benh): We should really put a timeout on how long we - // keep tasks running on a slave that never have frameworks - // reregister and claim them. - LOG(WARNING) << "Possibly orphaned task " << task.task_id() - << " of framework " << task.framework_id() - << " running on slave " << *slave; + // Add the slave's tasks to the frameworks. + foreachkey (const FrameworkID& frameworkId, slave->tasks) { + foreachvalue (Task* task, slave->tasks[frameworkId]) { + Framework* framework = getFramework(task->framework_id()); + if (framework != NULL) { // The framework might not be re-registered yet. + framework->addTask(task); + } else { + // TODO(benh): We should really put a timeout on how long we + // keep tasks running on a slave that never have frameworks + // reregister and claim them. + LOG(WARNING) << "Possibly orphaned task " << task->task_id() + << " of framework " << task->framework_id() + << " running on slave " << *slave; + } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index e97d213..37ce31a 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -335,13 +335,10 @@ protected: void deactivate(Slave* slave); // Add a slave. - void addSlave(Slave* slave, bool reregister = false); - - void readdSlave( + void addSlave( Slave* slave, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<Archive::Framework>& completedFrameworks); + const std::vector<Archive::Framework>& completedFrameworks = + std::vector<Archive::Framework>()); // Remove the slave from the registrar and from the master's state. void removeSlave(Slave* slave); @@ -820,7 +817,11 @@ struct Slave Slave(const SlaveInfo& _info, const process::UPID& _pid, const Option<std::string> _version, - const process::Time& _registeredTime) + const process::Time& _registeredTime, + const std::vector<ExecutorInfo> executorInfos = + std::vector<ExecutorInfo>(), + const std::vector<Task> tasks = + std::vector<Task>()) : id(_info.id()), info(_info), pid(_pid), @@ -831,6 +832,15 @@ struct Slave observer(NULL) { CHECK(_info.has_id()); + + foreach (const ExecutorInfo& executorInfo, executorInfos) { + CHECK(executorInfo.has_framework_id()); + addExecutor(executorInfo.framework_id(), executorInfo); + } + + foreach (const Task& task, tasks) { + addTask(new Task(task)); + } } ~Slave() {}
