Re-structured the Slave::tasks map.

Review: https://reviews.apache.org/r/14438


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/859a1a87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/859a1a87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/859a1a87

Branch: refs/heads/master
Commit: 859a1a87a40a08df6b7c41367a21bf354bd18389
Parents: 4e14e31
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Tue Oct 1 10:33:20 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Wed Oct 2 12:20:32 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 127 +++++++++++++++++++++++++--------------------
 src/master/master.hpp |  28 +++++-----
 2 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/859a1a87/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6f6d66c..ce8365f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -240,8 +240,12 @@ Master::~Master()
     // Remove tasks that are in the slave but not in any framework.
     // This could happen when the framework has yet to reregister
     // after master failover.
-    foreachvalue (Task* task, utils::copy(slave->tasks)) {
-      removeTask(task);
+    // NOTE: keys() and values() are used because slave->tasks is
+    //       modified by removeTask()!
+    foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
+      foreach (Task* task, slave->tasks[frameworkId].values()) {
+        removeTask(task);
+      }
     }
 
     // Kill the slave observer.
@@ -569,13 +573,8 @@ void Master::exited(const UPID& pid)
         // If a slave is checkpointing, remove all non-checkpointing
         // frameworks from the slave.
         // First, collect all the frameworks running on this slave.
-        hashset<FrameworkID> frameworkIds;
-        foreachvalue (Task* task, slave->tasks) {
-          frameworkIds.insert(task->framework_id());
-        }
-        foreachkey (const FrameworkID& frameworkId, slave->executors) {
-          frameworkIds.insert(frameworkId);
-        }
+        hashset<FrameworkID> frameworkIds =
+          slave->tasks.keys() | slave->executors.keys();
 
         // Now, remove all the non-checkpointing frameworks.
         foreach (const FrameworkID& frameworkId, frameworkIds) {
@@ -783,20 +782,22 @@ void Master::reregisterFramework(const FrameworkInfo& 
frameworkInfo,
 
     // Add any running tasks reported by slaves for this framework.
     foreachvalue (Slave* slave, slaves) {
-      foreachvalue (Task* task, slave->tasks) {
-        if (framework->id == task->framework_id()) {
-          framework->addTask(task);
-          // Also add the task's executor for resource accounting.
-          if (task->has_executor_id()) {
-            if (!framework->hasExecutor(slave->id, task->executor_id())) {
-              CHECK(slave->hasExecutor(framework->id, task->executor_id()))
-                << "Unknown executor " << task->executor_id()
-                << " of framework " << framework->id
-                << " for the task " << task->task_id();
-
-              const ExecutorInfo& executorInfo =
-                slave->executors[framework->id][task->executor_id()];
-              framework->addExecutor(slave->id, executorInfo);
+      foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+        foreachvalue (Task* task, slave->tasks[frameworkId]) {
+          if (framework->id == task->framework_id()) {
+            framework->addTask(task);
+            // Also add the task's executor for resource accounting.
+            if (task->has_executor_id()) {
+              if (!framework->hasExecutor(slave->id, task->executor_id())) {
+                CHECK(slave->hasExecutor(framework->id, task->executor_id()))
+                  << "Unknown executor " << task->executor_id()
+                  << " of framework " << framework->id
+                  << " for the task " << task->task_id();
+
+                const ExecutorInfo& executorInfo =
+                  slave->executors[framework->id][task->executor_id()];
+                framework->addExecutor(slave->id, executorInfo);
+              }
             }
           }
         }
@@ -1856,21 +1857,25 @@ void Master::reconcile(
   // missing from the slave. This could happen if the task was
   // dropped by the slave (e.g., slave exited before getting the
   // task or the task was launched while slave was in recovery).
-  foreachvalue (Task* task, utils::copy(slave->tasks)) {
-    if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
-      LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
-                   << " of framework " << task->framework_id()
-                   << " unknown to the slave " << slave->id
-                   << " (" << slave->info.hostname() << ")";
+  // NOTE: keys() and values() are used since statusUpdate()
+  //       modifies slave->tasks.
+  foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
+    foreach (Task* task, slave->tasks[frameworkId].values()) {
+      if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
+        LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+                     << " of framework " << task->framework_id()
+                     << " unknown to the slave " << slave->id
+                     << " (" << slave->info.hostname() << ")";
 
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          task->framework_id(),
-          slave->id,
-          task->task_id(),
-          TASK_LOST,
-          "Task is unknown to the slave");
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            task->framework_id(),
+            slave->id,
+            task->task_id(),
+            TASK_LOST,
+            "Task is unknown to the slave");
 
-      statusUpdate(update, UPID());
+        statusUpdate(update, UPID());
+      }
     }
   }
 
@@ -2118,8 +2123,11 @@ void Master::removeFramework(Slave* slave, Framework* 
framework)
             << " from slave " << slave->id
             << " (" << slave->info.hostname() << ")";
 
-  // Remove pointers to framework's tasks in slaves, and send status updates.
-  foreachvalue (Task* task, utils::copy(slave->tasks)) {
+  // Remove pointers to framework's tasks in slaves, and send status
+  // updates.
+  // NOTE: values() is used because statusUpdate() modifies
+  //       slave->tasks.
+  foreach (Task* task, slave->tasks[framework->id].values()) {
     // Remove tasks that belong to this framework.
     if (task->framework_id() == framework->id) {
       // A framework might not actually exist because the master failed
@@ -2281,26 +2289,31 @@ void Master::removeSlave(Slave* slave)
     allocator->slaveRemoved(slave->id);
   }
 
-  // Remove pointers to slave's tasks in frameworks, and send status updates
-  foreachvalue (Task* task, utils::copy(slave->tasks)) {
-    // A framework might not actually exist because the master failed
-    // over and the framework hasn't reconnected. This can be a tricky
-    // situation for frameworks that want to have high-availability,
-    // because if they eventually do connect they won't ever get a
-    // status update about this task.  Perhaps in the future what we
-    // want to do is create a local Framework object to represent that
-    // framework until it fails over. See the TODO above in
-    // Master::reregisterSlave.
-    const StatusUpdate& update = protobuf::createStatusUpdate(
-        task->framework_id(),
-        task->slave_id(),
-        task->task_id(),
-        TASK_LOST,
-        "Slave " + slave->info.hostname() + " removed",
-        (task->has_executor_id() ?
-            Option<ExecutorID>(task->executor_id()) : None()));
+  // Remove pointers to slave's tasks in frameworks, and send status
+  // updates.
+  // NOTE: keys() and values() are used because statusUpdate()
+  //       modifies slave->tasks.
+  foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
+    foreach (Task* task, slave->tasks[frameworkId].values()) {
+      // A framework might not actually exist because the master failed
+      // over and the framework hasn't reconnected. This can be a tricky
+      // situation for frameworks that want to have high-availability,
+      // because if they eventually do connect they won't ever get a
+      // status update about this task.  Perhaps in the future what we
+      // want to do is create a local Framework object to represent that
+      // framework until it fails over. See the TODO above in
+      // Master::reregisterSlave.
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          task->framework_id(),
+          task->slave_id(),
+          task->task_id(),
+          TASK_LOST,
+          "Slave " + slave->info.hostname() + " removed",
+          (task->has_executor_id() ?
+              Option<ExecutorID>(task->executor_id()) : None()));
 
-    statusUpdate(update, UPID());
+      statusUpdate(update, UPID());
+    }
   }
 
   foreach (Offer* offer, utils::copy(slave->offers)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/859a1a87/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index bd5cb1f..0aeec7f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -299,25 +299,19 @@ struct Slave
 
   Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
   {
-    foreachvalue (Task* task, tasks) {
-      if (task->framework_id() == frameworkId &&
-          task->task_id() == taskId) {
-        return task;
-      }
+    if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) {
+      return tasks[frameworkId][taskId];
     }
-
     return NULL;
   }
 
   void addTask(Task* task)
   {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(!tasks.contains(key))
+    CHECK(!tasks[task->framework_id()].contains(task->task_id()))
       << "Duplicate task " << task->task_id()
       << " of framework " << task->framework_id();
 
-    tasks[key] = task;
+    tasks[task->framework_id()][task->task_id()] = task;
     LOG(INFO) << "Adding task " << task->task_id()
               << " with resources " << task->resources()
               << " on slave " << id << " (" << info.hostname() << ")";
@@ -326,13 +320,15 @@ struct Slave
 
   void removeTask(Task* task)
   {
-    std::pair<FrameworkID, TaskID> key =
-      std::make_pair(task->framework_id(), task->task_id());
-    CHECK(tasks.contains(key))
+    CHECK(tasks[task->framework_id()].contains(task->task_id()))
       << "Unknown task " << task->task_id()
       << " of framework " << task->framework_id();
 
-    tasks.erase(key);
+    tasks[task->framework_id()].erase(task->task_id());
+    if (tasks[task->framework_id()].empty()) {
+      tasks.erase(task->framework_id());
+    }
+
     killedTasks.remove(task->framework_id(), task->task_id());
     LOG(INFO) << "Removing task " << task->task_id()
               << " with resources " << task->resources()
@@ -413,11 +409,11 @@ struct Slave
   // Executors running on this slave.
   hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;
 
-  // Tasks running on this slave, indexed by FrameworkID x TaskID.
+  // Tasks present on this slave.
   // TODO(bmahler): The task pointer ownership complexity arises from the fact
   // that we own the pointer here, but it's shared with the Framework struct.
   // We should find a way to eliminate this.
-  hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks;
+  hashmap<FrameworkID, hashmap<TaskID, Task*> > tasks;
 
   // Tasks that were asked to kill by frameworks.
   // This is used for reconciliation when the slave re-registers.

Reply via email to