Fixed a bug where executor info lingers on master if failed to launch.

Master relies on `ExitedExecutorMessage` from the agent to remove
executor entries. However, this message won't be sent if an executor
never actually launched (due to transient error), leaving executor
info on the master and the executor's resources claimed.
See MESOS-1720.

This patch fixes this issue by sending the `ExitedExecutorMessage`
from the agent if the executor is never launched.

Review: https://reviews.apache.org/r/65449/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2bdf4935
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2bdf4935
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2bdf4935

Branch: refs/heads/1.5.x
Commit: 2bdf4935b7929d0dce614d76461cddb991df89da
Parents: fb0e2f1
Author: Meng Zhu <m...@mesosphere.io>
Authored: Tue Feb 13 22:45:07 2018 -0800
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Wed Feb 14 03:41:16 2018 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp       | 276 +++++++++++++++++++++++++++++++++++++----
 src/slave/slave.hpp       |  10 +-
 src/tests/mock_slave.cpp  |   8 +-
 src/tests/mock_slave.hpp  |   8 +-
 src/tests/slave_tests.cpp |  35 ++++--
 5 files changed, 292 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2bdf4935/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 4e2b33a..c0501f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1935,7 +1935,13 @@ void Slave::runTask(
 
   const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
 
-  run(frameworkInfo, executorInfo, task, None(), resourceVersionUuids, pid);
+  run(frameworkInfo,
+      executorInfo,
+      task,
+      None(),
+      resourceVersionUuids,
+      pid,
+      launchExecutor);
 }
 
 
@@ -1945,7 +1951,8 @@ void Slave::run(
     Option<TaskInfo> task,
     Option<TaskGroupInfo> taskGroup,
     const vector<ResourceVersionUUID>& resourceVersionUuids,
-    const UPID& pid)
+    const UPID& pid,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
@@ -2030,6 +2037,10 @@ void Slave::run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the agent is " << state;
 
+    // We do not send `ExitedExecutorMessage` here because the disconnected
+    // agent is expected to (eventually) reregister and reconcile the executor
+    // states with the master.
+
     // TODO(vinod): Consider sending a TASK_LOST here.
     // Currently it is tricky because 'statusUpdate()'
     // ignores updates for unknown frameworks.
@@ -2150,7 +2161,8 @@ void Slave::run(
                  executorInfo,
                  task,
                  taskGroup,
-                 resourceVersionUuids));
+                 resourceVersionUuids,
+                 launchExecutor));
 }
 
 
@@ -2160,7 +2172,8 @@ void Slave::_run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
     const Option<TaskGroupInfo>& taskGroup,
-    const std::vector<ResourceVersionUUID>& resourceVersionUuids)
+    const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   // TODO(anindya_sinha): Consider refactoring the initial steps common
   // to `_run()` and `__run()`.
@@ -2182,6 +2195,14 @@ void Slave::_run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the framework " << frameworkId
                  << " does not exist";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2202,6 +2223,13 @@ void Slave::_run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2226,6 +2254,14 @@ void Slave::_run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2270,6 +2306,13 @@ void Slave::_run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2293,7 +2336,8 @@ void Slave::_run(
                  executorInfo,
                  task,
                  taskGroup,
-                 resourceVersionUuids));
+                 resourceVersionUuids,
+                 launchExecutor));
 }
 
 
@@ -2303,7 +2347,8 @@ void Slave::__run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
     const Option<TaskGroupInfo>& taskGroup,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
@@ -2323,6 +2368,14 @@ void Slave::__run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the framework " << frameworkId
                  << " does not exist";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2345,6 +2398,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2369,6 +2429,14 @@ void Slave::__run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2445,6 +2513,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2525,6 +2600,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2592,6 +2674,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2645,6 +2734,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2664,8 +2760,8 @@ void Slave::__run(
       removeFramework(framework);
     }
 
-    // We don't send a TASK_LOST here because the slave is
-    // terminating.
+    // We don't send TASK_LOST or ExitedExecutorMessage here because the slave
+    // is terminating.
     return;
   }
 
@@ -2674,29 +2770,159 @@ void Slave::__run(
   LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
             << " for framework " << frameworkId;
 
-  // Either send the task/task group to an executor or start a new executor
-  // and queue it until the executor has started.
-  Executor* executor = framework->getExecutor(executorId);
-
-  if (executor == nullptr) {
-    executor = framework->addExecutor(executorInfo);
+  auto doLaunchExecutor = [&]() {
+    Executor* executor = framework->addExecutor(executorInfo);
 
     if (secretGenerator) {
       generateSecret(framework->id(), executor->id, executor->containerId)
         .onAny(defer(
-            self(),
-            &Self::launchExecutor,
-            lambda::_1,
-            frameworkId,
-            executorId,
-            taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
+              self(),
+              &Self::launchExecutor,
+              lambda::_1,
+              frameworkId,
+              executorId,
+              taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
     } else {
-      launchExecutor(
+      Slave::launchExecutor(
           None(),
           frameworkId,
           executorId,
           taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
     }
+
+    return executor;
+  };
+
+  Executor* executor = framework->getExecutor(executorId);
+
+  if (launchExecutor.isNone()) {
+    // This is the legacy case where the master did not set the
+    // `launch_executor` flag. Executor will be launched if there is none.
+    if (executor == nullptr) {
+      executor = doLaunchExecutor();
+    }
+  } else {
+    if (taskGroup.isNone() && task.get().has_command()) {
+      // We are dealing with command task; a new command executor will be
+      // launched.
+      CHECK(executor == nullptr);
+      executor = doLaunchExecutor();
+    } else {
+      // Master set the `launch_executor` flag and this is not a command task.
+      if (launchExecutor.get()) {
+        // Master requests launching a new executor.
+        if (executor == nullptr) {
+          executor = doLaunchExecutor();
+        } else {
+          // Master requests launching executor but an executor still exits
+          // on the agent. In this case we will drop tasks. This could happen 
if
+          // the executor is already terminated on the agent (and agent has 
sent
+          // out the `ExitedExecutorMessage` and it was received by the master)
+          // but the agent is still waiting for all the status updates to be
+          // acked before removing the executor struct.
+
+          // We report TASK_DROPPED to the framework because the task was
+          // never launched. For non-partition-aware frameworks, we report
+          // TASK_LOST for backward compatibility.
+          mesos::TaskState taskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
+          foreach (const TaskInfo& _task, tasks) {
+            const StatusUpdate update = protobuf::createStatusUpdate(
+                frameworkId,
+                info.id(),
+                _task.task_id(),
+                taskState,
+                TaskStatus::SOURCE_SLAVE,
+                id::UUID::random(),
+                "Master wants to launch executor, but there already exits one",
+                TaskStatus::REASON_EXECUTOR_TERMINATED,
+                executorId);
+
+            statusUpdate(update, UPID());
+          }
+
+          // Master expects new executor to be launched for this task(s) 
launch.
+          // To keep the master executor entries updated, the agent needs to
+          // send 'ExitedExecutorMessage' even though no executor launched.
+          if (executor->state == Executor::TERMINATED) {
+            sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+          } else {
+            // This could happen if the following sequence of events happen:
+            //
+            //  (1) Master sends `runTaskMessage` to agent with
+            //      `launch_executor = true`;
+            //
+            //  (2) Before the agent got the `runTaskMessage`, it reconnects 
and
+            //      reconciles with the master. Master then removes the 
executor
+            //      entry it asked the agent to launch in step (1);
+            //
+            //  (3) Agent got the `runTaskMessage` sent in step (1), launches
+            //      the task and the executor (that the master does not know
+            //      about).
+            //
+            //  (4) Master now sends another `runTaskMessage` for the same
+            //      executor id with `launch_executor = true`.
+            //
+            // The agent ends up with a lingering executor that the master does
+            // not know about. We will shutdown the executor.
+            //
+            // TODO(mzhu): This could be avoided if the agent can
+            // tell whether the master's message was sent before or after the
+            // reconnection and discard the message in the former case.
+            //
+            // TODO(mzhu): Master needs to do proper executor reconciliation
+            // with the agent to avoid this from happening.
+            _shutdownExecutor(framework, executor);
+          }
+
+          return;
+        }
+      } else {
+        // Master does not want to launch executor.
+        if (executor == nullptr) {
+          // Master wants no new executor launched and there is none running on
+          // the agent. This could happen if the task expects some previous
+          // tasks to launch the executor. However, the earlier task got killed
+          // or dropped hence did not launch the executor but the master 
doesn't
+          // know about it yet because the `ExitedExecutorMessage` is still in
+          // flight. In this case, we will drop the task.
+          //
+          // We report TASK_DROPPED to the framework because the task was
+          // never launched. For non-partition-aware frameworks, we report
+          // TASK_LOST for backward compatibility.
+          mesos::TaskState taskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
+          foreach (const TaskInfo& _task, tasks) {
+            const StatusUpdate update = protobuf::createStatusUpdate(
+                frameworkId,
+                info.id(),
+                _task.task_id(),
+                taskState,
+                TaskStatus::SOURCE_SLAVE,
+                id::UUID::random(),
+                "No executor is expected to launch and there is none running",
+                TaskStatus::REASON_EXECUTOR_TERMINATED,
+                executorId);
+
+            statusUpdate(update, UPID());
+          }
+
+          // We do not send `ExitedExecutorMessage` here because the 
expectation
+          // is that there is already one on the fly to master. If the message
+          // gets dropped, we will hopefully reconcile with the master later.
+
+          return;
+        }
+      }
+    }
   }
 
   CHECK_NOTNULL(executor);
@@ -3310,20 +3536,22 @@ void Slave::runTaskGroup(
     return;
   }
 
+  // TODO(mzhu): Consider doing a `CHECK` here since this shouldn't be 
possible.
   if (taskGroupInfo.tasks().empty()) {
     LOG(ERROR) << "Ignoring run task group message from " << from
                << " for framework " << frameworkInfo.id()
                << " because it has no tasks";
+
     return;
   }
 
-  run(
-      frameworkInfo,
+  run(frameworkInfo,
       executorInfo,
       None(),
       taskGroupInfo,
       resourceVersionUuids,
-      UPID());
+      UPID(),
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2bdf4935/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index efe4a54..75e8ccd 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -166,7 +166,8 @@ public:
       Option<TaskInfo> task,
       Option<TaskGroupInfo> taskGroup,
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
-      const process::UPID& pid);
+      const process::UPID& pid,
+      const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
   virtual void _run(
@@ -175,7 +176,8 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   // TODO(mzhu): Combine this with `runTaskGroup()' and replace all
   // `runTaskGroup()' mock with `run()` mock.
@@ -391,7 +393,9 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
+
 
   // This is called when the resource limits of the container have
   // been updated for the given tasks and task groups. If the update is

http://git-wip-us.apache.org/repos/asf/mesos/blob/2bdf4935/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index 0dbb30a..f73a45f 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -120,7 +120,7 @@ MockSlave::MockSlave(
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _run(_, _, _, _, _, _))
+  EXPECT_CALL(*this, _run(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
   EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
@@ -167,7 +167,8 @@ void MockSlave::unmocked__run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
     const Option<TaskGroupInfo>& taskGroup,
-    const std::vector<ResourceVersionUUID>& resourceVersionUuids)
+    const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::_run(
       unschedules,
@@ -175,7 +176,8 @@ void MockSlave::unmocked__run(
       executorInfo,
       taskInfo,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2bdf4935/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 8cc3820..42f7d55 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -119,13 +119,14 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  MOCK_METHOD6(_run, void(
+  MOCK_METHOD7(_run, void(
       const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked__run(
       const process::Future<std::list<bool>>& unschedules,
@@ -133,7 +134,8 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD6(runTaskGroup, void(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/2bdf4935/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 561dd15..95990c4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1834,7 +1834,7 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   // unmocked `_run()` method. Instead, we want to do nothing so that tasks
   // remain in the framework's 'pending' list.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(FutureSatisfy(&_run));
 
   // The executor should not be launched.
@@ -4120,18 +4120,20 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Option<TaskGroupInfo> taskGroup;
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
+  Option<bool> launchExecutor;
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
                     SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo),
                     SaveArg<3>(&task_),
                     SaveArg<4>(&taskGroup),
-                    SaveArg<5>(&resourceVersionUuids)));
+                    SaveArg<5>(&resourceVersionUuids),
+                    SaveArg<6>(&launchExecutor)));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4163,7 +4165,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
       executorInfo,
       task_,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
@@ -4248,23 +4251,26 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
   Option<TaskInfo> task_1, task_2;
   vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2;
+  Option<bool> launchExecutor1, launchExecutor2;
 
   Future<Nothing> _run1, _run2;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
                     SaveArg<0>(&unschedules1),
                     SaveArg<1>(&frameworkInfo1),
                     SaveArg<2>(&executorInfo1),
                     SaveArg<3>(&task_1),
                     SaveArg<4>(&taskGroup1),
-                    SaveArg<5>(&resourceVersionUuids1)))
+                    SaveArg<5>(&resourceVersionUuids1),
+                    SaveArg<6>(&launchExecutor1)))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
                     SaveArg<0>(&unschedules2),
                     SaveArg<1>(&frameworkInfo2),
                     SaveArg<2>(&executorInfo2),
                     SaveArg<3>(&task_2),
                     SaveArg<4>(&taskGroup2),
-                    SaveArg<5>(&resourceVersionUuids2)));
+                    SaveArg<5>(&resourceVersionUuids2),
+                    SaveArg<6>(&launchExecutor2)));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4306,7 +4312,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
       executorInfo1,
       task_1,
       taskGroup1,
-      resourceVersionUuids1);
+      resourceVersionUuids1,
+      launchExecutor1);
 
   slave.get()->mock()->unmocked__run(
       unschedules2,
@@ -4314,7 +4321,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
       executorInfo2,
       task_2,
       taskGroup2,
-      resourceVersionUuids2);
+      resourceVersionUuids2,
+      launchExecutor2);
 
   Clock::settle();
 
@@ -7198,19 +7206,21 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Option<TaskGroupInfo> taskGroup_;
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
+  Option<bool> launchExecutor;
 
   // Skip what `Slave::_run()` normally does, save its arguments for
   // later, till reaching the critical moment when to kill the task
   // in the future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
                     SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo_),
                     SaveArg<3>(&task_),
                     SaveArg<4>(&taskGroup_),
-                    SaveArg<5>(&resourceVersionUuids)));
+                    SaveArg<5>(&resourceVersionUuids),
+                    SaveArg<6>(&launchExecutor)));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7276,7 +7286,8 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
       executorInfo_,
       task_,
       taskGroup_,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);

Reply via email to