Repository: mesos
Updated Branches:
  refs/heads/master 0b94805f6 -> 103d6e06a


Added new protobuf field `launch_executor` in RunTask(Group)Message.

This boolean flag is used for the master to specify whether a
new executor should be launched for the task or task group (with
the exception of the command executor). This allows the master
to control executor creation on the agent.

Also updated the relevant message handlers and mock functions.

Review: https://reviews.apache.org/r/65445/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7c29031b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7c29031b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7c29031b

Branch: refs/heads/master
Commit: 7c29031bf35232a9e8b0c8bbbb8c826d0185673a
Parents: 0b94805
Author: Meng Zhu <m...@mesosphere.io>
Authored: Tue Feb 13 22:44:48 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Wed Feb 14 02:28:10 2018 -0800

----------------------------------------------------------------------
 src/messages/messages.proto | 22 ++++++++++++++----
 src/slave/slave.cpp         | 50 +++++++++++++++++++++++++++++-----------
 src/slave/slave.hpp         | 18 +++++++++++++--
 src/tests/mock_slave.cpp    | 16 ++++++++-----
 src/tests/mock_slave.hpp    | 16 ++++++++-----
 src/tests/slave_tests.cpp   |  6 ++---
 6 files changed, 93 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0db44a2..556801d 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -293,8 +293,6 @@ message ReviveOffersMessage {
 
 
 /**
- * Depending on the `TaskInfo`, this message either notifies an existing
- * executor to run the task, or starts a new executor and runs the task.
  * This message is sent when scheduler::Call::Accept is sent with
  * Offer::Operation::Launch.
  *
@@ -326,13 +324,19 @@ message RunTaskMessage {
   // forward executor messages through the master. For schedulers
   // still using the driver, this will remain set.
   optional string pid = 3;
+
+  // If this field is absent (as prior to 1.5), depending on the `TaskInfo`,
+  // this message either notifies an existing executor to run the task, or
+  // starts a new executor and runs the task.
+  //
+  // Starting 1.5, this field will always be set. Agent should only launch
+  // an executor if this field value is true.
+  optional bool launch_executor = 6;
 }
 
 
 /**
- * This message either notifies an existing executor to run a task
- * group, or starts a new executor and runs the task group. This
- * message is sent when scheduler::Call::Accept is sent with
+ * This message is sent when scheduler::Call::Accept is sent with
  * Offer::Operation::LaunchGroup.
  *
  * See executor::Event::LaunchGroup.
@@ -354,6 +358,14 @@ message RunTaskGroupMessage {
   // because this means the operation is operating on resources that
   // might have already been invalidated.
   repeated ResourceVersionUUID resource_version_uuids = 4;
+
+  // If this field is absent (as prior to 1.5), depending on the `TaskInfo`,
+  // this message either notifies an existing executor to run the task, or
+  // starts a new executor and runs the task.
+  //
+  // Starting 1.5, this field will always be set. Agent should only launch
+  // an executor if this field value is true.
+  optional bool launch_executor = 5;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index df8b33d..893a196 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -672,19 +672,10 @@ void Slave::initialize()
       &SlaveReregisteredMessage::connection);
 
   install<RunTaskMessage>(
-      &Slave::runTask,
-      &RunTaskMessage::framework,
-      &RunTaskMessage::framework_id,
-      &RunTaskMessage::pid,
-      &RunTaskMessage::task,
-      &RunTaskMessage::resource_version_uuids);
+      &Slave::handleRunTaskMessage);
 
   install<RunTaskGroupMessage>(
-      &Slave::runTaskGroup,
-      &RunTaskGroupMessage::framework,
-      &RunTaskGroupMessage::executor,
-      &RunTaskGroupMessage::task_group,
-      &RunTaskGroupMessage::resource_version_uuids);
+      &Slave::handleRunTaskGroupMessage);
 
   install<KillTaskMessage>(
       &Slave::killTask);
@@ -1900,6 +1891,22 @@ void Slave::doReliableRegistration(Duration maxBackoff)
 }
 
 
+void Slave::handleRunTaskMessage(
+    const UPID& from,
+    RunTaskMessage&& runTaskMessage)
+{
+  runTask(
+      from,
+      runTaskMessage.framework(),
+      runTaskMessage.framework_id(),
+      runTaskMessage.pid(),
+      runTaskMessage.task(),
+      google::protobuf::convert(runTaskMessage.resource_version_uuids()),
+      runTaskMessage.has_launch_executor() ?
+          Option<bool>(runTaskMessage.launch_executor()) : None());
+}
+
+
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
@@ -1908,7 +1915,8 @@ void Slave::runTask(
     const FrameworkID& frameworkId,
     const UPID& pid,
     const TaskInfo& task,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.has_executor(), task.has_command())
     << "Task " << task.task_id()
@@ -3267,12 +3275,28 @@ void Slave::launchExecutor(
 }
 
 
+void Slave::handleRunTaskGroupMessage(
+    const UPID& from,
+    RunTaskGroupMessage&& runTaskGroupMessage)
+{
+  runTaskGroup(
+      from,
+      runTaskGroupMessage.framework(),
+      runTaskGroupMessage.executor(),
+      runTaskGroupMessage.task_group(),
+      google::protobuf::convert(runTaskGroupMessage.resource_version_uuids()),
+      runTaskGroupMessage.has_launch_executor() ?
+          Option<bool>(runTaskGroupMessage.launch_executor()) : None());
+}
+
+
 void Slave::runTaskGroup(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const TaskGroupInfo& taskGroupInfo,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   if (master != from) {
     LOG(WARNING) << "Ignoring run task group message from " << from

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 78b81e1..b2c7880 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -144,6 +144,12 @@ public:
 
   void doReliableRegistration(Duration maxBackoff);
 
+  // TODO(mzhu): Combine this with `runTask()' and replace all `runTask()'
+  // mock with `run()` mock.
+  void handleRunTaskMessage(
+      const process::UPID& from,
+      RunTaskMessage&& runTaskMessage);
+
   // Made 'virtual' for Slave mocking.
   virtual void runTask(
       const process::UPID& from,
@@ -151,7 +157,8 @@ public:
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   void run(
       const FrameworkInfo& frameworkInfo,
@@ -184,13 +191,20 @@ public:
       const std::list<TaskInfo>& tasks,
       const std::list<TaskGroupInfo>& taskGroups);
 
+  // TODO(mzhu): Combine this with `runTaskGroup()' and replace all
+  // `runTaskGroup()' mock with `run()` mock.
+  void handleRunTaskGroupMessage(
+      const process::UPID& from,
+      RunTaskGroupMessage&& runTaskGroupMessage);
+
   // Made 'virtual' for Slave mocking.
   virtual void runTaskGroup(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroupInfo,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
   virtual void killTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 9d930cf..cf383bb 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -126,11 +126,11 @@ MockSlave::MockSlave(
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, ___run(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked____run));
-  EXPECT_CALL(*this, runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
   EXPECT_CALL(*this, _run(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
-  EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _))
+  EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
   EXPECT_CALL(*this, killTask(_, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -175,7 +175,8 @@ void MockSlave::unmocked_runTask(
     const FrameworkID& frameworkId,
     const UPID& pid,
     const TaskInfo& task,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::runTask(
       from,
@@ -183,7 +184,8 @@ void MockSlave::unmocked_runTask(
       frameworkInfo.id(),
       pid,
       task,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 
@@ -210,14 +212,16 @@ void MockSlave::unmocked_runTaskGroup(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const TaskGroupInfo& taskGroup,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::runTaskGroup(
       from,
       frameworkInfo,
       executorInfo,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 942ead5..39a8651 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -117,13 +117,14 @@ public:
       const std::list<TaskInfo>& tasks,
       const std::list<TaskGroupInfo>& taskGroups);
 
-  MOCK_METHOD6(runTask, void(
+  MOCK_METHOD7(runTask, void(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked_runTask(
       const process::UPID& from,
@@ -131,7 +132,8 @@ public:
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD6(_run, void(
       const process::Future<std::list<bool>>& unschedules,
@@ -149,19 +151,21 @@ public:
       const Option<TaskGroupInfo>& taskGroup,
       const std::vector<ResourceVersionUUID>& resourceVersionUuids);
 
-  MOCK_METHOD5(runTaskGroup, void(
+  MOCK_METHOD6(runTaskGroup, void(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked_runTaskGroup(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD2(killTask, void(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/7c29031b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index caa0056..3e82991 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4114,7 +4114,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillRepeatedly(FutureArg<1>(&status));
 
-  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
@@ -4239,7 +4239,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
@@ -7668,7 +7668,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(FutureArg<1>(&update2))
     .WillRepeatedly(Return());
 
-  EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(),
                      &MockSlave::unmocked_runTaskGroup));
 

Reply via email to