Minor cleanups to the Master code. Review: https://reviews.apache.org/r/25566
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0760b007 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0760b007 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0760b007 Branch: refs/heads/master Commit: 0760b007ad65bc91e8cea377339978c78d36d247 Parents: b621191 Author: Benjamin Mahler <[email protected]> Authored: Thu Sep 11 10:48:20 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Sep 17 15:46:38 2014 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 2 +- src/master/master.cpp | 53 +++++++++++++++++++++++++++------------------- src/master/master.hpp | 36 +++++++++++++++++-------------- 3 files changed, 52 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 6dd11fe..8db4d9a 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -405,7 +405,7 @@ Future<Response> Master::Http::stats(const Request& request) totalResources += resource; } } - foreach (const Resource& resource, slave->resourcesInUse) { + foreach (const Resource& resource, slave->used()) { if (resource.type() == Value::SCALAR) { usedResources += resource; } http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 11d75fb..52a7409 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3763,10 +3763,10 @@ void Master::reconcile( // missing from the slave. This could happen if the task was // dropped by the slave (e.g., slave exited before getting the // task or the task was launched while slave was in recovery). - // NOTE: keys() and values() are used since statusUpdate() - // modifies slave->tasks. - foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { - foreach (Task* task, slave->tasks[frameworkId].values()) { + // NOTE: copies are used because statusUpdate() modifies + // slave->tasks. + foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) { + foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) { if (!slaveTasks.contains(task->framework_id(), task->task_id())) { LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id() << " of framework " << task->framework_id() @@ -4010,10 +4010,11 @@ void Master::removeFramework(Framework* framework) // Remove the framework's offers (if they weren't removed before). foreach (Offer* offer, utils::copy(framework->offers)) { - allocator->resourcesRecovered(offer->framework_id(), - offer->slave_id(), - Resources(offer->resources()), - None()); + allocator->resourcesRecovered( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + None()); removeOffer(offer); } @@ -4078,9 +4079,9 @@ void Master::removeFramework(Slave* slave, Framework* framework) // Remove pointers to framework's tasks in slaves, and send status // updates. - // NOTE: values() is used because statusUpdate() modifies + // NOTE: A copy is used because statusUpdate() modifies // slave->tasks. - foreach (Task* task, slave->tasks[framework->id].values()) { + foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) { // Remove tasks that belong to this framework. if (task->framework_id() == framework->id) { // A framework might not actually exist because the master failed @@ -4257,8 +4258,8 @@ void Master::removeSlave(Slave* slave) // updates. Rather, build up the updates so that we can send them // after the slave is removed from the registry. vector<StatusUpdate> updates; - foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { - foreach (Task* task, slave->tasks[frameworkId].values()) { + foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) { + foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) { const StatusUpdate& update = protobuf::createStatusUpdate( task->framework_id(), task->slave_id(), @@ -4372,6 +4373,21 @@ void Master::removeTask(Task* task) { CHECK_NOTNULL(task); + Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id())); + + if (!protobuf::isTerminalState(task->state())) { + LOG(WARNING) << "Removing task " << task->task_id() + << " with resources " << task->resources() + << " of framework " << task->framework_id() + << " on slave " << *slave + << " in non-terminal state " << task->state(); + } else { + LOG(INFO) << "Removing task " << task->task_id() + << " with resources " << task->resources() + << " of framework " << task->framework_id() + << " on slave " << *slave; + } + // Remove from framework. Framework* framework = getFramework(task->framework_id()); if (framework != NULL) { // A framework might not be re-connected yet. @@ -4379,15 +4395,13 @@ void Master::removeTask(Task* task) } // Remove from slave. - Slave* slave = getSlave(task->slave_id()); - CHECK_NOTNULL(slave); slave->removeTask(task); // Tell the allocator about the recovered resources. allocator->resourcesRecovered( task->framework_id(), task->slave_id(), - Resources(task->resources()), + task->resources(), None()); // Update the task state metric. @@ -4396,12 +4410,7 @@ void Master::removeTask(Task* task) case TASK_FAILED: ++metrics.tasks_failed; break; case TASK_KILLED: ++metrics.tasks_killed; break; case TASK_LOST: ++metrics.tasks_lost; break; - default: - LOG(WARNING) << "Removing task " << task->task_id() - << " of framework " << task->framework_id() - << " and slave " << task->slave_id() - << " in non-terminal state " << task->state(); - break; + default: break; } delete task; @@ -4934,7 +4943,7 @@ double Master::_resources_used(const std::string& name) double used = 0.0; foreachvalue (Slave* slave, slaves.registered) { - foreach (const Resource& resource, slave->resourcesInUse) { + foreach (const Resource& resource, slave->used()) { if (resource.name() == name && resource.type() == Value::SCALAR) { used += resource.scalar().value(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 54e3918..80d7535 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -839,7 +839,6 @@ struct Slave LOG(INFO) << "Adding task " << task->task_id() << " with resources " << task->resources() << " on slave " << id << " (" << info.hostname() << ")"; - resourcesInUse += task->resources(); } void removeTask(Task* task) @@ -854,10 +853,6 @@ struct Slave } killedTasks.remove(task->framework_id(), task->task_id()); - LOG(INFO) << "Removing task " << task->task_id() - << " with resources " << task->resources() - << " on slave " << id << " (" << info.hostname() << ")"; - resourcesInUse -= task->resources(); } void addOffer(Offer* offer) @@ -867,7 +862,6 @@ struct Slave VLOG(1) << "Adding offer " << offer->id() << " with resources " << offer->resources() << " on slave " << id << " (" << info.hostname() << ")"; - resourcesOffered += offer->resources(); } void removeOffer(Offer* offer) @@ -877,7 +871,6 @@ struct Slave VLOG(1) << "Removing offer " << offer->id() << " with resources " << offer->resources() << " on slave " << id << " (" << info.hostname() << ")"; - resourcesOffered -= offer->resources(); } bool hasExecutor(const FrameworkID& frameworkId, @@ -895,9 +888,6 @@ struct Slave << " of framework " << frameworkId; executors[frameworkId][executorInfo.executor_id()] = executorInfo; - - // Update the resources in use to reflect running this executor. - resourcesInUse += executorInfo.resources(); } void removeExecutor(const FrameworkID& frameworkId, @@ -906,15 +896,32 @@ struct Slave CHECK(hasExecutor(frameworkId, executorId)) << "Unknown executor " << executorId << " of framework " << frameworkId; - // Update the resources in use to reflect removing this executor. - resourcesInUse -= executors[frameworkId][executorId].resources(); - executors[frameworkId].erase(executorId); if (executors[frameworkId].empty()) { executors.erase(frameworkId); } } + Resources used() const + { + Resources used; + + foreachkey (const FrameworkID& frameworkId, tasks) { + foreachvalue (const Task* task, tasks.find(frameworkId)->second) { + used += task->resources(); + } + } + + foreachkey (const FrameworkID& frameworkId, executors) { + foreachvalue (const ExecutorInfo& executorInfo, + executors.find(frameworkId)->second) { + used += executorInfo.resources(); + } + } + + return used; + } + const SlaveID id; const SlaveInfo info; @@ -927,9 +934,6 @@ struct Slave // enabled because we expect it reregister after recovery. bool disconnected; - Resources resourcesOffered; // Resources offered. - Resources resourcesInUse; // Resources used by tasks and executors. - // Executors running on this slave. hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;
