Repository: mesos
Updated Branches:
  refs/heads/master 0750549cb -> 937bf8a89


Refactored task launching in the master.

This adds addTask(), which is similar to addSlave() and addFramework().
The benefit is that we allow the caller to add the task and have control
over when the RunTaskMessage is sent.

Review: https://reviews.apache.org/r/28364


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/937bf8a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/937bf8a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/937bf8a8

Branch: refs/heads/master
Commit: 937bf8a89c642b1f49deec4bca3a3a55579df08c
Parents: 0750549
Author: Vinod Kone <[email protected]>
Authored: Tue Dec 2 14:24:48 2014 -0800
Committer: Benjamin Mahler <[email protected]>
Committed: Tue Dec 2 14:39:08 2014 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 46 +++++++++++++++++++++++-----------------------
 src/master/master.hpp | 12 ++++++------
 2 files changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/937bf8a8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8fcda4b..c840d49 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2422,15 +2422,15 @@ Future<bool> Master::authorizeTask(
 }
 
 
-Resources Master::launchTask(
+Resources Master::addTask(
     const TaskInfo& task,
     Framework* framework,
     Slave* slave)
 {
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
-  CHECK(slave->connected) << "Launching task " << task.task_id()
-                          << " on disconnected slave " << *slave;
+  CHECK(slave->connected) << "Adding task " << task.task_id()
+                          << " to disconnected slave " << *slave;
 
   // The resources consumed.
   Resources resources = task.resources();
@@ -2474,19 +2474,6 @@ Resources Master::launchTask(
   slave->addTask(t);
   framework->addTask(t);
 
-  // Tell the slave to launch the task!
-  LOG(INFO) << "Launching task " << task.task_id()
-            << " of framework " << *framework
-            << " with resources " << task.resources()
-            << " on slave " << *slave;
-
-  RunTaskMessage message;
-  message.mutable_framework()->MergeFrom(framework->info);
-  message.mutable_framework_id()->MergeFrom(framework->id);
-  message.set_pid(framework->pid);
-  message.mutable_task()->MergeFrom(task);
-  send(slave->pid, message);
-
   return resources;
 }
 
@@ -2541,7 +2528,6 @@ void Master::_launchTasks(
   }
 
   Resources usedResources; // Accumulated resources used.
-
   size_t index = 0;
   foreach (const Future<bool>& authorization, authorizations.get()) {
     const TaskInfo& task = tasks[index++];
@@ -2587,21 +2573,21 @@ void Master::_launchTasks(
     }
 
     // Validate the task.
-    const Option<Error>& validation = validateTask(
+    const Option<Error>& validationError = validateTask(
         task,
         framework,
         slave,
         totalResources,
         usedResources);
 
-    if (validation.isSome()) {
+    if (validationError.isSome()) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           framework->id,
           task.slave_id(),
           task.task_id(),
           TASK_ERROR,
           TaskStatus::SOURCE_MASTER,
-          validation.get().message,
+          validationError.get().message,
           TaskStatus::REASON_TASK_INVALID);
 
       metrics.tasks_error++;
@@ -2612,9 +2598,24 @@ void Master::_launchTasks(
       continue;
     }
 
-    // Launch task.
+    // Add task.
     if (pending) {
-      usedResources += launchTask(task, framework, slave);
+      usedResources += addTask(task, framework, slave);
+
+      // TODO(bmahler): Consider updating this log message to
+      // indicate when the executor is also being launched.
+      LOG(INFO) << "Launching task " << task.task_id()
+                << " of framework " << *framework
+                << " with resources " << task.resources()
+                << " on slave " << *slave;
+
+      RunTaskMessage message;
+      message.mutable_framework()->MergeFrom(framework->info);
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      message.set_pid(framework->pid);
+      message.mutable_task()->MergeFrom(task);
+
+      send(slave->pid, message);
     }
   }
 
@@ -3073,7 +3074,6 @@ void Master::reregisterSlave(
     return;
   }
 
-
   if (slaves.removed.get(slaveInfo.id()).isSome()) {
     // To compensate for the case where a non-strict registrar is
     // being used, we explicitly deny removed slaves from

http://git-wip-us.apache.org/repos/asf/mesos/blob/937bf8a8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 79b9ba7..e6ed87d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -364,12 +364,6 @@ protected:
       const TaskInfo& task,
       Framework* framework);
 
-  // Launch a task from a task description.
-  Resources launchTask(
-      const TaskInfo& task,
-      Framework* framework,
-      Slave* slave);
-
   // 'launchTasks()' continuation.
   void _launchTasks(
       const FrameworkID& frameworkId,
@@ -379,6 +373,12 @@ protected:
       const Filters& filters,
       const process::Future<std::list<process::Future<bool>>>& authorizations);
 
+  // Add the task and its executor (if not already running) to the
+  // framework and slave. Returns the resources consumed as a result,
+  // which includes resources for the task and its executor
+  // (if not already running).
+  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
   // Transitions the task, and recovers resources if the task becomes
   // terminal.
   void updateTask(Task* task, const StatusUpdate& update);

Reply via email to