Re-structured the Slave::tasks map. Review: https://reviews.apache.org/r/14438
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/859a1a87 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/859a1a87 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/859a1a87 Branch: refs/heads/master Commit: 859a1a87a40a08df6b7c41367a21bf354bd18389 Parents: 4e14e31 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Oct 1 10:33:20 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Wed Oct 2 12:20:32 2013 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 127 +++++++++++++++++++++++++-------------------- src/master/master.hpp | 28 +++++----- 2 files changed, 82 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/859a1a87/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 6f6d66c..ce8365f 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -240,8 +240,12 @@ Master::~Master() // Remove tasks that are in the slave but not in any framework. // This could happen when the framework has yet to reregister // after master failover. - foreachvalue (Task* task, utils::copy(slave->tasks)) { - removeTask(task); + // NOTE: keys() and values() are used because slave->tasks is + // modified by removeTask()! + foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { + foreach (Task* task, slave->tasks[frameworkId].values()) { + removeTask(task); + } } // Kill the slave observer. @@ -569,13 +573,8 @@ void Master::exited(const UPID& pid) // If a slave is checkpointing, remove all non-checkpointing // frameworks from the slave. // First, collect all the frameworks running on this slave. - hashset<FrameworkID> frameworkIds; - foreachvalue (Task* task, slave->tasks) { - frameworkIds.insert(task->framework_id()); - } - foreachkey (const FrameworkID& frameworkId, slave->executors) { - frameworkIds.insert(frameworkId); - } + hashset<FrameworkID> frameworkIds = + slave->tasks.keys() | slave->executors.keys(); // Now, remove all the non-checkpointing frameworks. foreach (const FrameworkID& frameworkId, frameworkIds) { @@ -783,20 +782,22 @@ void Master::reregisterFramework(const FrameworkInfo& frameworkInfo, // Add any running tasks reported by slaves for this framework. foreachvalue (Slave* slave, slaves) { - foreachvalue (Task* task, slave->tasks) { - if (framework->id == task->framework_id()) { - framework->addTask(task); - // Also add the task's executor for resource accounting. - if (task->has_executor_id()) { - if (!framework->hasExecutor(slave->id, task->executor_id())) { - CHECK(slave->hasExecutor(framework->id, task->executor_id())) - << "Unknown executor " << task->executor_id() - << " of framework " << framework->id - << " for the task " << task->task_id(); - - const ExecutorInfo& executorInfo = - slave->executors[framework->id][task->executor_id()]; - framework->addExecutor(slave->id, executorInfo); + foreachkey (const FrameworkID& frameworkId, slave->tasks) { + foreachvalue (Task* task, slave->tasks[frameworkId]) { + if (framework->id == task->framework_id()) { + framework->addTask(task); + // Also add the task's executor for resource accounting. + if (task->has_executor_id()) { + if (!framework->hasExecutor(slave->id, task->executor_id())) { + CHECK(slave->hasExecutor(framework->id, task->executor_id())) + << "Unknown executor " << task->executor_id() + << " of framework " << framework->id + << " for the task " << task->task_id(); + + const ExecutorInfo& executorInfo = + slave->executors[framework->id][task->executor_id()]; + framework->addExecutor(slave->id, executorInfo); + } } } } @@ -1856,21 +1857,25 @@ void Master::reconcile( // missing from the slave. This could happen if the task was // dropped by the slave (e.g., slave exited before getting the // task or the task was launched while slave was in recovery). - foreachvalue (Task* task, utils::copy(slave->tasks)) { - if (!slaveTasks.contains(task->framework_id(), task->task_id())) { - LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id() - << " of framework " << task->framework_id() - << " unknown to the slave " << slave->id - << " (" << slave->info.hostname() << ")"; + // NOTE: keys() and values() are used since statusUpdate() + // modifies slave->tasks. + foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { + foreach (Task* task, slave->tasks[frameworkId].values()) { + if (!slaveTasks.contains(task->framework_id(), task->task_id())) { + LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id() + << " of framework " << task->framework_id() + << " unknown to the slave " << slave->id + << " (" << slave->info.hostname() << ")"; - const StatusUpdate& update = protobuf::createStatusUpdate( - task->framework_id(), - slave->id, - task->task_id(), - TASK_LOST, - "Task is unknown to the slave"); + const StatusUpdate& update = protobuf::createStatusUpdate( + task->framework_id(), + slave->id, + task->task_id(), + TASK_LOST, + "Task is unknown to the slave"); - statusUpdate(update, UPID()); + statusUpdate(update, UPID()); + } } } @@ -2118,8 +2123,11 @@ void Master::removeFramework(Slave* slave, Framework* framework) << " from slave " << slave->id << " (" << slave->info.hostname() << ")"; - // Remove pointers to framework's tasks in slaves, and send status updates. - foreachvalue (Task* task, utils::copy(slave->tasks)) { + // Remove pointers to framework's tasks in slaves, and send status + // updates. + // NOTE: values() is used because statusUpdate() modifies + // slave->tasks. + foreach (Task* task, slave->tasks[framework->id].values()) { // Remove tasks that belong to this framework. if (task->framework_id() == framework->id) { // A framework might not actually exist because the master failed @@ -2281,26 +2289,31 @@ void Master::removeSlave(Slave* slave) allocator->slaveRemoved(slave->id); } - // Remove pointers to slave's tasks in frameworks, and send status updates - foreachvalue (Task* task, utils::copy(slave->tasks)) { - // A framework might not actually exist because the master failed - // over and the framework hasn't reconnected. This can be a tricky - // situation for frameworks that want to have high-availability, - // because if they eventually do connect they won't ever get a - // status update about this task. Perhaps in the future what we - // want to do is create a local Framework object to represent that - // framework until it fails over. See the TODO above in - // Master::reregisterSlave. - const StatusUpdate& update = protobuf::createStatusUpdate( - task->framework_id(), - task->slave_id(), - task->task_id(), - TASK_LOST, - "Slave " + slave->info.hostname() + " removed", - (task->has_executor_id() ? - Option<ExecutorID>(task->executor_id()) : None())); + // Remove pointers to slave's tasks in frameworks, and send status + // updates. + // NOTE: keys() and values() are used because statusUpdate() + // modifies slave->tasks. + foreach (const FrameworkID& frameworkId, slave->tasks.keys()) { + foreach (Task* task, slave->tasks[frameworkId].values()) { + // A framework might not actually exist because the master failed + // over and the framework hasn't reconnected. This can be a tricky + // situation for frameworks that want to have high-availability, + // because if they eventually do connect they won't ever get a + // status update about this task. Perhaps in the future what we + // want to do is create a local Framework object to represent that + // framework until it fails over. See the TODO above in + // Master::reregisterSlave. + const StatusUpdate& update = protobuf::createStatusUpdate( + task->framework_id(), + task->slave_id(), + task->task_id(), + TASK_LOST, + "Slave " + slave->info.hostname() + " removed", + (task->has_executor_id() ? + Option<ExecutorID>(task->executor_id()) : None())); - statusUpdate(update, UPID()); + statusUpdate(update, UPID()); + } } foreach (Offer* offer, utils::copy(slave->offers)) { http://git-wip-us.apache.org/repos/asf/mesos/blob/859a1a87/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index bd5cb1f..0aeec7f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -299,25 +299,19 @@ struct Slave Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId) { - foreachvalue (Task* task, tasks) { - if (task->framework_id() == frameworkId && - task->task_id() == taskId) { - return task; - } + if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) { + return tasks[frameworkId][taskId]; } - return NULL; } void addTask(Task* task) { - std::pair<FrameworkID, TaskID> key = - std::make_pair(task->framework_id(), task->task_id()); - CHECK(!tasks.contains(key)) + CHECK(!tasks[task->framework_id()].contains(task->task_id())) << "Duplicate task " << task->task_id() << " of framework " << task->framework_id(); - tasks[key] = task; + tasks[task->framework_id()][task->task_id()] = task; LOG(INFO) << "Adding task " << task->task_id() << " with resources " << task->resources() << " on slave " << id << " (" << info.hostname() << ")"; @@ -326,13 +320,15 @@ struct Slave void removeTask(Task* task) { - std::pair<FrameworkID, TaskID> key = - std::make_pair(task->framework_id(), task->task_id()); - CHECK(tasks.contains(key)) + CHECK(tasks[task->framework_id()].contains(task->task_id())) << "Unknown task " << task->task_id() << " of framework " << task->framework_id(); - tasks.erase(key); + tasks[task->framework_id()].erase(task->task_id()); + if (tasks[task->framework_id()].empty()) { + tasks.erase(task->framework_id()); + } + killedTasks.remove(task->framework_id(), task->task_id()); LOG(INFO) << "Removing task " << task->task_id() << " with resources " << task->resources() @@ -413,11 +409,11 @@ struct Slave // Executors running on this slave. hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors; - // Tasks running on this slave, indexed by FrameworkID x TaskID. + // Tasks present on this slave. // TODO(bmahler): The task pointer ownership complexity arises from the fact // that we own the pointer here, but it's shared with the Framework struct. // We should find a way to eliminate this. - hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks; + hashmap<FrameworkID, hashmap<TaskID, Task*> > tasks; // Tasks that were asked to kill by frameworks. // This is used for reconciliation when the slave re-registers.