Eliminated redundant resource accounting in the master. Review: https://reviews.apache.org/r/26199
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1befdce Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1befdce Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1befdce Branch: refs/heads/master Commit: e1befdcee8edbab4ccb33d139f714b7e0e954dd8 Parents: c866484 Author: Benjamin Mahler <[email protected]> Authored: Fri Sep 26 16:19:09 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Oct 8 11:45:11 2014 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 8 ++++--- src/master/master.cpp | 24 ++++++------------- src/master/master.hpp | 60 +++++++++++++++++++++++++++++----------------- 3 files changed, 50 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 3fd4b45..a5e34cc 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -415,9 +415,11 @@ Future<Response> Master::Http::stats(const Request& request) totalResources += resource; } } - foreach (const Resource& resource, slave->usedResources) { - if (resource.type() == Value::SCALAR) { - usedResources += resource; + foreachvalue (const Resources& resources, slave->usedResources) { + foreach (const Resource& resource, resources) { + if (resource.type() == Value::SCALAR) { + usedResources += resource; + } } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e2c58d1..79588da 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4212,8 +4212,7 @@ void Master::addSlave(Slave* slave, bool reregister) spawn(slave->observer); if (!reregister) { - allocator->slaveAdded( - slave->id, slave->info, hashmap<FrameworkID, Resources>()); + allocator->slaveAdded(slave->id, slave->info, slave->usedResources); } } @@ -4228,10 +4227,6 @@ void Master::readdSlave( addSlave(slave, true); - // Add the executors and tasks to the slave and framework state and - // determine the resources that have been allocated to frameworks. - hashmap<FrameworkID, Resources> resources; - foreach (const ExecutorInfo& executorInfo, executorInfos) { // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs @@ -4246,8 +4241,6 @@ void Master::readdSlave( if (framework != NULL) { // The framework might not be re-registered yet. framework->addExecutor(slave->id, executorInfo); } - - resources[executorInfo.framework_id()] += executorInfo.resources(); } foreach (const Task& task, tasks) { @@ -4267,11 +4260,6 @@ void Master::readdSlave( << " of framework " << task.framework_id() << " running on slave " << *slave; } - - // Terminal tasks do not consume resoures. - if (!protobuf::isTerminalState(task.state())) { - resources[task.framework_id()] += task.resources(); - } } // Re-add completed tasks reported by the slave. @@ -4300,7 +4288,7 @@ void Master::readdSlave( } } - allocator->slaveAdded(slave->id, slave->info, resources); + allocator->slaveAdded(slave->id, slave->info, slave->usedResources); } @@ -5146,9 +5134,11 @@ double Master::_resources_used(const std::string& name) double used = 0.0; foreachvalue (Slave* slave, slaves.registered) { - foreach (const Resource& resource, slave->usedResources) { - if (resource.name() == name && resource.type() == Value::SCALAR) { - used += resource.scalar().value(); + foreachvalue (const Resources& resources, slave->usedResources) { + foreach (const Resource& resource, resources) { + if (resource.name() == name && resource.type() == Value::SCALAR) { + used += resource.scalar().value(); + } } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/e1befdce/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 5c0f224..5cafae3 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -838,17 +838,19 @@ struct Slave void addTask(Task* task) { - CHECK(!tasks[task->framework_id()].contains(task->task_id())) - << "Duplicate task " << task->task_id() - << " of framework " << task->framework_id(); + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); + + CHECK(!tasks[frameworkId].contains(taskId)) + << "Duplicate task " << taskId << " of framework " << frameworkId; - tasks[task->framework_id()][task->task_id()] = task; + tasks[frameworkId][taskId] = task; if (!protobuf::isTerminalState(task->state())) { - usedResources += task->resources(); + usedResources[frameworkId] += task->resources(); } - LOG(INFO) << "Adding task " << task->task_id() + LOG(INFO) << "Adding task " << taskId << " with resources " << task->resources() << " on slave " << id << " (" << info.hostname() << ")"; } @@ -859,30 +861,40 @@ struct Slave // functionally for all tasks is expensive, for now. void taskTerminated(Task* task) { + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); + CHECK(protobuf::isTerminalState(task->state())); - CHECK(tasks[task->framework_id()].contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); + CHECK(tasks[frameworkId].contains(taskId)) + << "Unknown task " << taskId << " of framework " << frameworkId; - usedResources -= task->resources(); + usedResources[frameworkId] -= task->resources(); + if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { + usedResources.erase(frameworkId); + } } void removeTask(Task* task) { - CHECK(tasks[task->framework_id()].contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); + + CHECK(tasks[frameworkId].contains(taskId)) + << "Unknown task " << taskId << " of framework " << frameworkId; if (!protobuf::isTerminalState(task->state())) { - usedResources -= task->resources(); + usedResources[frameworkId] -= task->resources(); + if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { + usedResources.erase(frameworkId); + } } - tasks[task->framework_id()].erase(task->task_id()); - if (tasks[task->framework_id()].empty()) { - tasks.erase(task->framework_id()); + tasks[frameworkId].erase(taskId); + if (tasks[frameworkId].empty()) { + tasks.erase(frameworkId); } - killedTasks.remove(task->framework_id(), task->task_id()); + killedTasks.remove(frameworkId, taskId); } void addOffer(Offer* offer) @@ -916,7 +928,7 @@ struct Slave << " of framework " << frameworkId; executors[frameworkId][executorInfo.executor_id()] = executorInfo; - usedResources += executorInfo.resources(); + usedResources[frameworkId] += executorInfo.resources(); } void removeExecutor(const FrameworkID& frameworkId, @@ -925,7 +937,11 @@ struct Slave CHECK(hasExecutor(frameworkId, executorId)) << "Unknown executor " << executorId << " of framework " << frameworkId; - usedResources -= executors[frameworkId][executorId].resources(); + usedResources[frameworkId] -= + executors[frameworkId][executorId].resources(); + + // XXX Remove. + executors[frameworkId].erase(executorId); if (executors[frameworkId].empty()) { executors.erase(frameworkId); @@ -964,8 +980,8 @@ struct Slave // Active offers on this slave. hashset<Offer*> offers; - Resources usedResources; // Active task / executor resources. - Resources offeredResources; // Offered resources. + hashmap<FrameworkID, Resources> usedResources; // Active task / executors. + Resources offeredResources; // Offers. SlaveObserver* observer;
