Made master set `launch_executor` in the RunTask(Group)Message.

By setting a new field `launch_executor` in the RunTask(Group)Message,
the master is able to control executor creation on the agent.

Also refactored the `addTask()` logic. Added two new functions:
`isTaskLaunchExecutor()` checks if a task needs to launch an executor;
`addExecutor()` adds an executor to the framework and slave.

Review: https://reviews.apache.org/r/65504/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10aa875d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10aa875d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10aa875d

Branch: refs/heads/1.5.x
Commit: 10aa875df8947f8cbfb318820101984d99259070
Parents: 08e0ceb
Author: Meng Zhu <m...@mesosphere.io>
Authored: Tue Feb 13 22:44:58 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Wed Feb 14 03:41:16 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 112 +++++++++++++++++++++++++++++++--------------
 src/master/master.hpp |  19 ++++++--
 2 files changed, 92 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2758000..2b093d6 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3883,44 +3883,56 @@ Future<bool> Master::authorizeSlave(const 
Option<string>& principal)
 }
 
 
-Resources Master::addTask(
-    const TaskInfo& task,
+bool Master::isLaunchExecutor(
+    const ExecutorID& executorId,
     Framework* framework,
-    Slave* slave)
+    Slave* slave) const
 {
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(slave);
-  CHECK(slave->connected) << "Adding task " << task.task_id()
-                          << " to disconnected agent " << *slave;
 
-  // The resources consumed.
-  Resources resources = task.resources();
+  if (!slave->hasExecutor(framework->id(), executorId)) {
+    CHECK(!framework->hasExecutor(slave->id, executorId))
+      << "Executor '" << executorId
+      << "' known to the framework " << *framework
+      << " but unknown to the agent " << *slave;
+    return true;
+  }
+
+  return false;
+}
+
 
-  // Determine if this task launches an executor, and if so make sure
-  // the slave and framework state has been updated accordingly.
+void Master::addExecutor(
+    const ExecutorInfo& executorInfo,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK(slave->connected) << "Adding executor " << executorInfo.executor_id()
+                          << " to disconnected agent " << *slave;
 
-  if (task.has_executor()) {
-    // TODO(benh): Refactor this code into Slave::addTask.
-    if (!slave->hasExecutor(framework->id(), task.executor().executor_id())) {
-      CHECK(!framework->hasExecutor(slave->id, task.executor().executor_id()))
-        << "Executor '" << task.executor().executor_id()
-        << "' known to the framework " << *framework
-        << " but unknown to the agent " << *slave;
+  slave->addExecutor(framework->id(), executorInfo);
+  framework->addExecutor(slave->id, executorInfo);
+}
 
-      slave->addExecutor(framework->id(), task.executor());
-      framework->addExecutor(slave->id, task.executor());
 
-      resources += task.executor().resources();
-    }
-  }
+void Master::addTask(
+    const TaskInfo& task,
+    Framework* framework,
+    Slave* slave)
+{
+  CHECK_NOTNULL(framework);
+  CHECK_NOTNULL(slave);
+  CHECK(slave->connected) << "Adding task " << task.task_id()
+                          << " to disconnected agent " << *slave;
 
   // Add the task to the framework and slave.
   Task* t = new Task(protobuf::createTask(task, TASK_STAGING, 
framework->id()));
 
   slave->addTask(t);
   framework->addTask(t);
-
-  return resources;
 }
 
 
@@ -4953,7 +4965,23 @@ void Master::_accept(
 
           // Add task.
           if (pending) {
-            const Resources consumed = addTask(task, framework, slave);
+            Resources consumed;
+
+            bool launchExecutor = true;
+            if (task.has_executor()) {
+              launchExecutor = isLaunchExecutor(
+                  task.executor().executor_id(), framework, slave);
+
+              // Master tracks the new executor only if the task is not a
+              // command task.
+              if (launchExecutor) {
+                addExecutor(task.executor(), framework, slave);
+                consumed += task.executor().resources();
+              }
+            }
+
+            addTask(task, framework, slave);
+            consumed += task.resources();
 
             CHECK(available.contains(consumed))
               << available << " does not contain " << consumed;
@@ -4995,6 +5023,8 @@ void Master::_accept(
             message.set_pid(framework->pid.getOrElse(UPID()));
             message.mutable_task()->MergeFrom(task);
 
+            message.set_launch_executor(launchExecutor);
+
             if (HookManager::hooksAvailable()) {
               // Set labels retrieved from label-decorator hooks.
               message.mutable_task()->mutable_labels()->CopyFrom(
@@ -5013,11 +5043,11 @@ void Master::_accept(
               CHECK_SOME(downgradeResources(&message));
             }
 
-            // TODO(bmahler): Consider updating this log message to
-            // indicate when the executor is also being launched.
             LOG(INFO) << "Launching task " << task.task_id() << " of framework 
"
                       << *framework << " with resources " << task.resources()
-                      << " on agent " << *slave;
+                      << " on agent " << *slave << " on "
+                      << (launchExecutor ?
+                          " new executor" : " existing executor");
 
             send(slave->pid, message);
           }
@@ -5176,18 +5206,25 @@ void Master::_accept(
 
         set<TaskID> taskIds;
         Resources totalResources;
+        Resources executorResources;
+
+        bool launchExecutor =
+          isLaunchExecutor(executor.executor_id(), framework, slave);
+
+        if (launchExecutor) {
+          addExecutor(executor, framework, slave);
+          executorResources = executor.resources();
+          totalResources += executorResources;
+        }
+
+        message.set_launch_executor(launchExecutor);
 
         foreach (
             TaskInfo& task, *message.mutable_task_group()->mutable_tasks()) {
           taskIds.insert(task.task_id());
           totalResources += task.resources();
 
-          const Resources consumed = addTask(task, framework, slave);
-
-          CHECK(_offeredResources.contains(consumed))
-            << _offeredResources << " does not contain " << consumed;
-
-          _offeredResources -= consumed;
+          addTask(task, framework, slave);
 
           if (HookManager::hooksAvailable()) {
             // Set labels retrieved from label-decorator hooks.
@@ -5199,6 +5236,11 @@ void Master::_accept(
           }
         }
 
+        CHECK(_offeredResources.contains(totalResources))
+          << _offeredResources << " does not contain " << totalResources;
+
+        _offeredResources -= totalResources;
+
         // If the agent does not support reservation refinement, downgrade
         // the task and executor resources to the "pre-reservation-refinement"
         // format. This cannot contain any refined reservations since
@@ -5210,7 +5252,9 @@ void Master::_accept(
 
         LOG(INFO) << "Launching task group " << stringify(taskIds)
                   << " of framework " << *framework << " with resources "
-                  << totalResources << " on agent " << *slave;
+                  << totalResources -  executorResources << " on agent "
+                  << *slave << " on "
+                  << (launchExecutor ? " new executor" : " existing executor");
 
         send(slave->pid, message);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10aa875d/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a94ef38..9030cad 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -845,11 +845,20 @@ protected:
       const Offer::Operation::Destroy& destroy,
       const Option<process::http::authentication::Principal>& principal);
 
-  // Add the task and its executor (if not already running) to the
-  // framework and slave. Returns the resources consumed as a result,
-  // which includes resources for the task and its executor
-  // (if not already running).
-  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+  // Determine if a new executor needs to be launched.
+  bool isLaunchExecutor (
+      const ExecutorID& executorId,
+      Framework* framework,
+      Slave* slave) const;
+
+  // Add executor to the framework and slave.
+  void addExecutor(
+      const ExecutorInfo& executorInfo,
+      Framework* framework,
+      Slave* slave);
+
+  // Add task to the framework and slave.
+  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
 
   // Transitions the task, and recovers resources if the task becomes
   // terminal.

Reply via email to