Repository: mesos Updated Branches: refs/heads/master 82e886c91 -> 0760b007a
Added a removeExecutor helper in the Master. Review: https://reviews.apache.org/r/25565 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b6211913 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b6211913 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b6211913 Branch: refs/heads/master Commit: b62119134ca7c5020341ed2f3a991ebfaf8355ef Parents: 82e886c Author: Benjamin Mahler <[email protected]> Authored: Wed Sep 10 14:34:30 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Sep 17 15:46:37 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 213 ++++++++++++++++++++------------------------- src/master/master.hpp | 40 +++++---- 2 files changed, 121 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b6211913/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 41dcc46..11d75fb 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -649,50 +649,29 @@ void Master::finalize() { LOG(INFO) << "Master terminating"; - // Remove the frameworks. - // Note we are not deleting the pointers to the frameworks from the - // allocator or the roles because it is unnecessary bookkeeping at - // this point since we are shutting down. - foreachvalue (Framework* framework, frameworks.registered) { - // Remove pending tasks from the framework. - framework->pendingTasks.clear(); - - // Remove pointers to the framework's tasks in slaves. - foreachvalue (Task* task, utils::copy(framework->tasks)) { - Slave* slave = getSlave(task->slave_id()); - // Since we only find out about tasks when the slave re-registers, - // it must be the case that the slave exists! - CHECK(slave != NULL) - << "Unknown slave " << task->slave_id() - << " in the task " << task->task_id(); - - removeTask(task); + // Remove the slaves. + foreachvalue (Slave* slave, slaves.registered) { + // Remove tasks. + foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) { + foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) { + removeTask(task); + } } - // Remove the framework's offers (if they weren't removed before). - foreach (Offer* offer, utils::copy(framework->offers)) { - removeOffer(offer); + // Remove executors. + foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) { + foreachkey (const ExecutorID& executorId, + utils::copy(slave->executors[frameworkId])) { + removeExecutor(slave, frameworkId, executorId); + } } - delete framework; - } - frameworks.registered.clear(); - - CHECK_EQ(offers.size(), 0UL); - - foreachvalue (Slave* slave, slaves.registered) { - // Remove tasks that are in the slave but not in any framework. - // This could happen when the framework has yet to re-register - // after master failover. - // NOTE: keys() and values() are used because slave->tasks is - // modified by removeTask()! - foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { - foreach (Task* task, slave->tasks[frameworkId].values()) { - removeTask(task); - } + // Remove offers. + foreach (Offer* offer, utils::copy(slave->offers)) { + removeOffer(offer); } - // Kill the slave observer. + // Terminate the slave observer. terminate(slave->observer); wait(slave->observer); @@ -701,6 +680,26 @@ void Master::finalize() } slaves.registered.clear(); + // Remove the frameworks. + // Note we are not deleting the pointers to the frameworks from the + // allocator or the roles because it is unnecessary bookkeeping at + // this point since we are shutting down. + foreachvalue (Framework* framework, frameworks.registered) { + // Remove pending tasks from the framework. + framework->pendingTasks.clear(); + + // No tasks/executors/offers should remain since the slaves + // have been removed. + CHECK(framework->tasks.empty()); + CHECK(framework->executors.empty()); + CHECK(framework->offers.empty()); + + delete framework; + } + frameworks.registered.clear(); + + CHECK(offers.empty()); + foreachvalue (Future<Nothing> future, authenticating) { // NOTE: This is necessary during tests because a copy of // this future is used to setup authentication timeout. If a @@ -3282,33 +3281,22 @@ void Master::exitedExecutor( Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]); - // Tell the allocator about the recovered resources. - if (slave->hasExecutor(frameworkId, executorId)) { - ExecutorInfo executor = slave->executors[frameworkId][executorId]; - - LOG(INFO) << "Executor " << executorId - << " of framework " << frameworkId - << " on slave " << *slave << " " - << WSTRINGIFY(status); - - allocator->resourcesRecovered( - frameworkId, slaveId, Resources(executor.resources()), None()); - - // Remove executor from slave and framework. - slave->removeExecutor(frameworkId, executorId); - } else { - LOG(WARNING) << "Ignoring unknown exited executor " - << executorId << " on slave " << *slave; + if (!slave->hasExecutor(frameworkId, executorId)) { + LOG(WARNING) << "Ignoring unknown exited executor '" << executorId + << "' of framework " << frameworkId + << " on slave " << *slave; + return; } - Framework* framework = getFramework(frameworkId); - if (framework != NULL) { - framework->removeExecutor(slave->id, executorId); + LOG(INFO) << "Executor " << executorId + << " of framework " << frameworkId + << " on slave " << *slave << " " + << WSTRINGIFY(status); - // TODO(benh): Send the framework its executor's exit status? - // Or maybe at least have something like - // Scheduler::executorLost? - } + removeExecutor(slave, frameworkId, executorId); + + // TODO(benh): Send the framework its executor's exit status? + // Or maybe at least have something like Scheduler::executorLost? } @@ -3820,26 +3808,14 @@ void Master::reconcile( foreachkey (const ExecutorID& executorId, utils::copy(slave->executors[frameworkId])) { if (!slaveExecutors.contains(frameworkId, executorId)) { - LOG(WARNING) << "Removing executor " << executorId << " of framework " - << frameworkId << " as it is unknown to the slave " - << *slave; - - // TODO(bmahler): This is duplicated in several locations, we - // may benefit from a method for removing an executor from - // all the relevant data structures and the allocator, akin - // to removeTask(). - allocator->resourcesRecovered( - frameworkId, - slave->id, - slave->executors[frameworkId][executorId].resources(), - None()); - - slave->removeExecutor(frameworkId, executorId); - - if (frameworks.registered.contains(frameworkId)) { - frameworks.registered[frameworkId]->removeExecutor( - slave->id, executorId); - } + // TODO(bmahler): Reconcile executors correctly between the + // master and the slave, see: + // MESOS-1466, MESOS-1800, and MESOS-1720. + LOG(WARNING) << "Executor " << executorId + << " of framework " << frameworkId + << " possibly unknown to the slave " << *slave; + + removeExecutor(slave, frameworkId, executorId); } } } @@ -4045,15 +4021,9 @@ void Master::removeFramework(Framework* framework) foreachkey (const SlaveID& slaveId, framework->executors) { Slave* slave = getSlave(slaveId); if (slave != NULL) { - foreachpair (const ExecutorID& executorId, - const ExecutorInfo& executorInfo, - framework->executors[slaveId]) { - allocator->resourcesRecovered( - framework->id, - slave->id, - executorInfo.resources(), - None()); - slave->removeExecutor(framework->id, executorId); + foreachkey (const ExecutorID& executorId, + utils::copy(framework->executors[slaveId])) { + removeExecutor(slave, framework->id, executorId); } } } @@ -4134,14 +4104,7 @@ void Master::removeFramework(Slave* slave, Framework* framework) if (slave->executors.contains(framework->id)) { foreachkey (const ExecutorID& executorId, utils::copy(slave->executors[framework->id])) { - allocator->resourcesRecovered( - framework->id, - slave->id, - slave->executors[framework->id][executorId].resources(), - None()); - - framework->removeExecutor(slave->id, executorId); - slave->removeExecutor(framework->id, executorId); + removeExecutor(slave, framework->id, executorId); } } } @@ -4313,6 +4276,14 @@ void Master::removeSlave(Slave* slave) } } + // Remove executors from the slave for proper resource accounting. + foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) { + foreachkey (const ExecutorID& executorId, + utils::copy(slave->executors[frameworkId])) { + removeExecutor(slave, frameworkId, executorId); + } + } + foreach (Offer* offer, utils::copy(slave->offers)) { // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered' // once MESOS-621 is fixed. @@ -4323,24 +4294,6 @@ void Master::removeSlave(Slave* slave) removeOffer(offer, true); // Rescind! } - // Remove executors from the slave for proper resource accounting. - foreachkey (const FrameworkID& frameworkId, slave->executors) { - Framework* framework = getFramework(frameworkId); - if (framework != NULL) { - foreachkey (const ExecutorID& executorId, slave->executors[frameworkId]) { - // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered' - // once MESOS-621 is fixed. - allocator->resourcesRecovered( - frameworkId, - slave->id, - slave->executors[frameworkId][executorId].resources(), - None()); - - framework->removeExecutor(slave->id, executorId); - } - } - } - // Mark the slave as being removed. slaves.removing.insert(slave->id); slaves.registered.erase(slave->id); @@ -4455,6 +4408,32 @@ void Master::removeTask(Task* task) } +void Master::removeExecutor( + Slave* slave, + const FrameworkID& frameworkId, + const ExecutorID& executorId) +{ + CHECK_NOTNULL(slave); + CHECK(slave->hasExecutor(frameworkId, executorId)); + + ExecutorInfo executor = slave->executors[frameworkId][executorId]; + + LOG(INFO) << "Removing executor '" << executorId + << "' with resources " << executor.resources() + << " of framework " << frameworkId << " on slave " << *slave; + + allocator->resourcesRecovered( + frameworkId, slave->id, executor.resources(), None()); + + Framework* framework = getFramework(frameworkId); + if (framework != NULL) { // The framework might not be re-registered yet. + framework->removeExecutor(slave->id, executorId); + } + + slave->removeExecutor(frameworkId, executorId); +} + + void Master::offerTimeout(const OfferID& offerId) { Offer* offer = getOffer(offerId); http://git-wip-us.apache.org/repos/asf/mesos/blob/b6211913/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index b492600..54e3918 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -368,9 +368,15 @@ protected: const Filters& filters, const process::Future<std::list<process::Future<Option<Error> > > >& f); - // Remove a task. + // Remove a task and recover its resources. void removeTask(Task* task); + // Remove an executor and recover its resources. + void removeExecutor( + Slave* slave, + const FrameworkID& frameworkId, + const ExecutorID& executorId); + // Forwards the update to the framework. void forward( const StatusUpdate& update, @@ -897,14 +903,15 @@ struct Slave void removeExecutor(const FrameworkID& frameworkId, const ExecutorID& executorId) { - if (hasExecutor(frameworkId, executorId)) { - // Update the resources in use to reflect removing this executor. - resourcesInUse -= executors[frameworkId][executorId].resources(); + CHECK(hasExecutor(frameworkId, executorId)) + << "Unknown executor " << executorId << " of framework " << frameworkId; - executors[frameworkId].erase(executorId); - if (executors[frameworkId].size() == 0) { - executors.erase(frameworkId); - } + // Update the resources in use to reflect removing this executor. + resourcesInUse -= executors[frameworkId][executorId].resources(); + + executors[frameworkId].erase(executorId); + if (executors[frameworkId].empty()) { + executors.erase(frameworkId); } } @@ -1047,14 +1054,17 @@ struct Framework void removeExecutor(const SlaveID& slaveId, const ExecutorID& executorId) { - if (hasExecutor(slaveId, executorId)) { - // Update our resources to reflect removing this executor. - resources -= executors[slaveId][executorId].resources(); + CHECK(hasExecutor(slaveId, executorId)) + << "Unknown executor " << executorId + << " of framework " << id + << " of slave " << slaveId; - executors[slaveId].erase(executorId); - if (executors[slaveId].size() == 0) { - executors.erase(slaveId); - } + // Update our resources to reflect removing this executor. + resources -= executors[slaveId][executorId].resources(); + + executors[slaveId].erase(executorId); + if (executors[slaveId].empty()) { + executors.erase(slaveId); } }
