Repository: mesos
Updated Branches:
  refs/heads/1.5.x 13deae3ec -> ee0fb5908


Added new protobuf field `launch_executor` in RunTask(Group)Message.

This boolean flag is used for the master to specify whether a
new executor should be launched for the task or task group (with
the exception of the command executor). This allows the master
to control executor creation on the agent.

Also updated the relevant message handlers and mock functions.

Review: https://reviews.apache.org/r/65445/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/08e0ceb8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/08e0ceb8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/08e0ceb8

Branch: refs/heads/1.5.x
Commit: 08e0ceb84e4bf353e1f938482bb6766bf73310c7
Parents: 13deae3
Author: Meng Zhu <m...@mesosphere.io>
Authored: Tue Feb 13 22:44:48 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Wed Feb 14 03:41:15 2018 -0800

----------------------------------------------------------------------
 src/messages/messages.proto | 22 ++++++++++++++----
 src/slave/slave.cpp         | 50 +++++++++++++++++++++++++++++-----------
 src/slave/slave.hpp         | 18 +++++++++++++--
 src/tests/mock_slave.cpp    | 16 ++++++++-----
 src/tests/mock_slave.hpp    | 16 ++++++++-----
 src/tests/slave_tests.cpp   |  6 ++---
 6 files changed, 93 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 0db44a2..556801d 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -293,8 +293,6 @@ message ReviveOffersMessage {
 
 
 /**
- * Depending on the `TaskInfo`, this message either notifies an existing
- * executor to run the task, or starts a new executor and runs the task.
  * This message is sent when scheduler::Call::Accept is sent with
  * Offer::Operation::Launch.
  *
@@ -326,13 +324,19 @@ message RunTaskMessage {
   // forward executor messages through the master. For schedulers
   // still using the driver, this will remain set.
   optional string pid = 3;
+
+  // If this field is absent (as prior to 1.5), depending on the `TaskInfo`,
+  // this message either notifies an existing executor to run the task, or
+  // starts a new executor and runs the task.
+  //
+  // Starting 1.5, this field will always be set. Agent should only launch
+  // an executor if this field value is true.
+  optional bool launch_executor = 6;
 }
 
 
 /**
- * This message either notifies an existing executor to run a task
- * group, or starts a new executor and runs the task group. This
- * message is sent when scheduler::Call::Accept is sent with
+ * This message is sent when scheduler::Call::Accept is sent with
  * Offer::Operation::LaunchGroup.
  *
  * See executor::Event::LaunchGroup.
@@ -354,6 +358,14 @@ message RunTaskGroupMessage {
   // because this means the operation is operating on resources that
   // might have already been invalidated.
   repeated ResourceVersionUUID resource_version_uuids = 4;
+
+  // If this field is absent (as prior to 1.5), depending on the `TaskInfo`,
+  // this message either notifies an existing executor to run the task, or
+  // starts a new executor and runs the task.
+  //
+  // Starting 1.5, this field will always be set. Agent should only launch
+  // an executor if this field value is true.
+  optional bool launch_executor = 5;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 151c5eb..4470b09 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -672,19 +672,10 @@ void Slave::initialize()
       &SlaveReregisteredMessage::connection);
 
   install<RunTaskMessage>(
-      &Slave::runTask,
-      &RunTaskMessage::framework,
-      &RunTaskMessage::framework_id,
-      &RunTaskMessage::pid,
-      &RunTaskMessage::task,
-      &RunTaskMessage::resource_version_uuids);
+      &Slave::handleRunTaskMessage);
 
   install<RunTaskGroupMessage>(
-      &Slave::runTaskGroup,
-      &RunTaskGroupMessage::framework,
-      &RunTaskGroupMessage::executor,
-      &RunTaskGroupMessage::task_group,
-      &RunTaskGroupMessage::resource_version_uuids);
+      &Slave::handleRunTaskGroupMessage);
 
   install<KillTaskMessage>(
       &Slave::killTask);
@@ -1898,6 +1889,22 @@ void Slave::doReliableRegistration(Duration maxBackoff)
 }
 
 
+void Slave::handleRunTaskMessage(
+    const UPID& from,
+    RunTaskMessage&& runTaskMessage)
+{
+  runTask(
+      from,
+      runTaskMessage.framework(),
+      runTaskMessage.framework_id(),
+      runTaskMessage.pid(),
+      runTaskMessage.task(),
+      google::protobuf::convert(runTaskMessage.resource_version_uuids()),
+      runTaskMessage.has_launch_executor() ?
+          Option<bool>(runTaskMessage.launch_executor()) : None());
+}
+
+
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
@@ -1906,7 +1913,8 @@ void Slave::runTask(
     const FrameworkID& frameworkId,
     const UPID& pid,
     const TaskInfo& task,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.has_executor(), task.has_command())
     << "Task " << task.task_id()
@@ -3266,12 +3274,28 @@ void Slave::launchExecutor(
 }
 
 
+void Slave::handleRunTaskGroupMessage(
+    const UPID& from,
+    RunTaskGroupMessage&& runTaskGroupMessage)
+{
+  runTaskGroup(
+      from,
+      runTaskGroupMessage.framework(),
+      runTaskGroupMessage.executor(),
+      runTaskGroupMessage.task_group(),
+      google::protobuf::convert(runTaskGroupMessage.resource_version_uuids()),
+      runTaskGroupMessage.has_launch_executor() ?
+          Option<bool>(runTaskGroupMessage.launch_executor()) : None());
+}
+
+
 void Slave::runTaskGroup(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const TaskGroupInfo& taskGroupInfo,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   if (master != from) {
     LOG(WARNING) << "Ignoring run task group message from " << from

http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 45cea0d..a4f031c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -144,6 +144,12 @@ public:
 
   void doReliableRegistration(Duration maxBackoff);
 
+  // TODO(mzhu): Combine this with `runTask()' and replace all `runTask()'
+  // mock with `run()` mock.
+  void handleRunTaskMessage(
+      const process::UPID& from,
+      RunTaskMessage&& runTaskMessage);
+
   // Made 'virtual' for Slave mocking.
   virtual void runTask(
       const process::UPID& from,
@@ -151,7 +157,8 @@ public:
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   void run(
       const FrameworkInfo& frameworkInfo,
@@ -170,13 +177,20 @@ public:
       const Option<TaskGroupInfo>& taskGroup,
       const std::vector<ResourceVersionUUID>& resourceVersionUuids);
 
+  // TODO(mzhu): Combine this with `runTaskGroup()' and replace all
+  // `runTaskGroup()' mock with `run()` mock.
+  void handleRunTaskGroupMessage(
+      const process::UPID& from,
+      RunTaskGroupMessage&& runTaskGroupMessage);
+
   // Made 'virtual' for Slave mocking.
   virtual void runTaskGroup(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroupInfo,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
   virtual void killTask(

http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 8357edc..0dbb30a 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -118,11 +118,11 @@ MockSlave::MockSlave(
         authorizer)
 {
   // Set up default behaviors, calling the original methods.
-  EXPECT_CALL(*this, runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
   EXPECT_CALL(*this, _run(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
-  EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _))
+  EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
   EXPECT_CALL(*this, killTask(_, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -147,7 +147,8 @@ void MockSlave::unmocked_runTask(
     const FrameworkID& frameworkId,
     const UPID& pid,
     const TaskInfo& task,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::runTask(
       from,
@@ -155,7 +156,8 @@ void MockSlave::unmocked_runTask(
       frameworkInfo.id(),
       pid,
       task,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 
@@ -182,14 +184,16 @@ void MockSlave::unmocked_runTaskGroup(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const TaskGroupInfo& taskGroup,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::runTaskGroup(
       from,
       frameworkInfo,
       executorInfo,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 29ce714..8cc3820 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -101,13 +101,14 @@ public:
       SecretGenerator* secretGenerator,
       const Option<Authorizer*>& authorizer);
 
-  MOCK_METHOD6(runTask, void(
+  MOCK_METHOD7(runTask, void(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked_runTask(
       const process::UPID& from,
@@ -115,7 +116,8 @@ public:
       const FrameworkID& frameworkId,
       const process::UPID& pid,
       const TaskInfo& task,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD6(_run, void(
       const process::Future<std::list<bool>>& unschedules,
@@ -133,19 +135,21 @@ public:
       const Option<TaskGroupInfo>& taskGroup,
       const std::vector<ResourceVersionUUID>& resourceVersionUuids);
 
-  MOCK_METHOD5(runTaskGroup, void(
+  MOCK_METHOD6(runTaskGroup, void(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked_runTaskGroup(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD2(killTask, void(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/08e0ceb8/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 1591920..561dd15 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4110,7 +4110,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillRepeatedly(FutureArg<1>(&status));
 
-  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
@@ -4235,7 +4235,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTask(_, _, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask))
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
@@ -7187,7 +7187,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(FutureArg<1>(&update2))
     .WillRepeatedly(Return());
 
-  EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), runTaskGroup(_, _, _, _, _, _))
     .WillOnce(Invoke(slave.get()->mock(),
                      &MockSlave::unmocked_runTaskGroup));
 

Reply via email to