Repository: mesos
Updated Branches:
  refs/heads/1.5.x d2289d169 -> 3f8b19a92


Refactored agent task launch for better composition [1/2].

This helps to encapsulate a task launch into a single
future which will come in handy when enforcing the task
launch order.

This patch also consolidated the error handling code
in the task launch path.

Affected tests are also updated.

Review: https://reviews.apache.org/r/66126/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10c3a316
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10c3a316
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10c3a316

Branch: refs/heads/1.5.x
Commit: 10c3a31646035fabb2e3b4ad8f2708d777cf7419
Parents: d2289d1
Author: Greg Mann <gregorywm...@gmail.com>
Authored: Thu Apr 5 11:17:35 2018 -0700
Committer: Greg Mann <gregorywm...@gmail.com>
Committed: Thu Apr 5 11:50:24 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 190 +++++++++++++++++++++--------------------
 src/slave/slave.hpp       |   9 +-
 src/tests/mock_slave.cpp  |   8 +-
 src/tests/mock_slave.hpp  |   6 +-
 src/tests/slave_tests.cpp | 146 ++++++++++++++++---------------
 5 files changed, 186 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c0501f8..927d384 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2152,22 +2152,88 @@ void Slave::run(
     }
   }
 
+  auto onUnscheduleGCFailure =
+    [=](const Future<list<bool>>& unschedules) -> Future<list<bool>> {
+      LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
+                 << unschedules.failure();
+
+      Framework* _framework = getFramework(frameworkId);
+      if (_framework == nullptr) {
+        const string error =
+          "Cannot handle unschedule GC failure for " +
+          taskOrTaskGroup(task, taskGroup) + " because the framework " +
+          stringify(frameworkId) + " does not exist";
+
+        LOG(WARNING) << error;
+
+        return Failure(error);
+      }
+
+      // We report TASK_DROPPED to the framework because the task was
+      // never launched. For non-partition-aware frameworks, we report
+      // TASK_LOST for backward compatibility.
+      mesos::TaskState taskState = TASK_DROPPED;
+      if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
+      foreach (const TaskInfo& _task, tasks) {
+        _framework->removePendingTask(_task.task_id());
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            taskState,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            "Could not launch the task because we failed to unschedule"
+            " directories scheduled for gc",
+            TaskStatus::REASON_GC_ERROR);
+
+        // TODO(vinod): Ensure that the task status update manager
+        // reliably delivers this update. Currently, we don't guarantee
+        // this because removal of the framework causes the status
+        // update manager to stop retrying for its un-acked updates.
+        statusUpdate(update, UPID());
+      }
+
+      if (_framework->idle()) {
+        removeFramework(_framework);
+      }
+
+      return unschedules;
+  };
+
   // Run the task after the unschedules are done.
   collect(unschedules)
-    .onAny(defer(self(),
-                 &Self::_run,
-                 lambda::_1,
-                 frameworkInfo,
-                 executorInfo,
-                 task,
-                 taskGroup,
-                 resourceVersionUuids,
-                 launchExecutor));
+    .repair(defer(self(), onUnscheduleGCFailure))
+    .then(defer(
+        self(),
+        &Self::_run,
+        frameworkInfo,
+        executorInfo,
+        task,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor))
+    .recover(defer(self(),
+      [=](const Future<Nothing>& future) -> Future<Nothing> {
+        if (launchExecutor.isSome() && launchExecutor.get()) {
+          // Master expects new executor to be launched for this task launch.
+          // To keep the master executor entries updated, the agent needs to
+          // send 'ExitedExecutorMessage' even though no executor launched.
+          sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+        }
+
+        return future;
+      }
+    ));
 }
 
 
-void Slave::_run(
-    const Future<list<bool>>& unschedules,
+Future<Nothing> Slave::_run(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -2192,26 +2258,24 @@ void Slave::_run(
   const FrameworkID& frameworkId = frameworkInfo.id();
   Framework* framework = getFramework(frameworkId);
   if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " because the framework " << frameworkId
-                 << " does not exist";
+    const string error =
+      "Ignoring running " + taskOrTaskGroup(task, taskGroup) +
+      " because the framework " + stringify(frameworkId) + " does not exist";
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
+    LOG(WARNING) << error;
 
-    return;
+    return Failure(error);
   }
 
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because the framework is terminating";
+    const string error = "Ignoring running " +
+                         taskOrTaskGroup(task, taskGroup) + " of framework " +
+                         stringify(frameworkId) +
+                         " because the framework is terminating";
+
+    LOG(WARNING) << error;
 
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the pending tasks.
@@ -2223,14 +2287,7 @@ void Slave::_run(
       removeFramework(framework);
     }
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
+    return Failure(error);
   }
 
   // Ignore the launch if killed in the interim. The invariant here
@@ -2251,69 +2308,14 @@ void Slave::_run(
     << " was killed partially";
 
   if (allRemoved) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because it has been killed in the meantime";
+    const string error = "Ignoring running " +
+                         taskOrTaskGroup(task, taskGroup) + " of framework " +
+                         stringify(frameworkId) +
+                         " because it has been killed in the meantime";
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
+    LOG(WARNING) << error;
 
-    return;
-  }
-
-  CHECK(!unschedules.isDiscarded());
-
-  if (!unschedules.isReady()) {
-    LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
-               << (unschedules.isFailed() ?
-                   unschedules.failure() : "future discarded");
-
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task.task_id());
-
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Could not launch the task because we failed to unschedule"
-          " directories scheduled for gc",
-          TaskStatus::REASON_GC_ERROR);
-
-      // TODO(vinod): Ensure that the task status update manager
-      // reliably delivers this update. Currently, we don't guarantee
-      // this because removal of the framework causes the status
-      // update manager to stop retrying for its un-acked updates.
-      statusUpdate(update, UPID());
-    }
-
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
+    return Failure(error);
   }
 
   // Authorize the task or tasks (as in a task group) to ensure that the
@@ -2338,6 +2340,8 @@ void Slave::_run(
                  taskGroup,
                  resourceVersionUuids,
                  launchExecutor));
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 75e8ccd..28bbcc4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -170,8 +170,13 @@ public:
       const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
-  virtual void _run(
-      const process::Future<std::list<bool>>& unschedules,
+  //
+  // This function returns a future so that we can encapsulate a task(group)
+  // launch operation (from agent receiving the run message to the completion
+  // of `_run()`) into a single future. This includes all the asynchronous
+  // steps (currently two: unschedule GC and task authorization) prior to the
+  // executor launch.
+  virtual process::Future<Nothing> _run(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index f73a45f..5e72e82 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -120,7 +120,7 @@ MockSlave::MockSlave(
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*this, _run(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
   EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
@@ -161,8 +161,7 @@ void MockSlave::unmocked_runTask(
 }
 
 
-void MockSlave::unmocked__run(
-    const Future<list<bool>>& unschedules,
+Future<Nothing> MockSlave::unmocked__run(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
@@ -170,8 +169,7 @@ void MockSlave::unmocked__run(
     const std::vector<ResourceVersionUUID>& resourceVersionUuids,
     const Option<bool>& launchExecutor)
 {
-  slave::Slave::_run(
-      unschedules,
+  return slave::Slave::_run(
       frameworkInfo,
       executorInfo,
       taskInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 42f7d55..600789f 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -119,8 +119,7 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  MOCK_METHOD7(_run, void(
-      const process::Future<std::list<bool>>& unschedules,
+  MOCK_METHOD6(_run, process::Future<Nothing>(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -128,8 +127,7 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor));
 
-  void unmocked__run(
-      const process::Future<std::list<bool>>& unschedules,
+  process::Future<Nothing> unmocked__run(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95990c4..95a61cb 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1834,8 +1834,9 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   // unmocked `_run()` method. Instead, we want to do nothing so that tasks
   // remain in the framework's 'pending' list.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
-    .WillOnce(FutureSatisfy(&_run));
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&_run),
+                    Return(Nothing())));
 
   // The executor should not be launched.
   EXPECT_CALL(*executor, connected(_))
@@ -4114,7 +4115,6 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
-  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo;
   Option<TaskGroupInfo> taskGroup;
@@ -4125,15 +4125,15 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&unschedules),
-                    SaveArg<1>(&frameworkInfo),
-                    SaveArg<2>(&executorInfo),
-                    SaveArg<3>(&task_),
-                    SaveArg<4>(&taskGroup),
-                    SaveArg<5>(&resourceVersionUuids),
-                    SaveArg<6>(&launchExecutor)));
+                    SaveArg<0>(&frameworkInfo),
+                    SaveArg<1>(&executorInfo),
+                    SaveArg<2>(&task_),
+                    SaveArg<3>(&taskGroup),
+                    SaveArg<4>(&resourceVersionUuids),
+                    SaveArg<5>(&launchExecutor),
+                    Return(Nothing())));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4159,18 +4159,23 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo,
-      task_,
-      taskGroup,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo,
+        executorInfo,
+        task_,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
+  AWAIT(unmocked__run);
+
   driver.stop();
   driver.join();
 }
@@ -4245,7 +4250,6 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
-  Future<list<bool>> unschedules1, unschedules2;
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4254,23 +4258,23 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Option<bool> launchExecutor1, launchExecutor2;
 
   Future<Nothing> _run1, _run2;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
-                    SaveArg<0>(&unschedules1),
-                    SaveArg<1>(&frameworkInfo1),
-                    SaveArg<2>(&executorInfo1),
-                    SaveArg<3>(&task_1),
-                    SaveArg<4>(&taskGroup1),
-                    SaveArg<5>(&resourceVersionUuids1),
-                    SaveArg<6>(&launchExecutor1)))
+                    SaveArg<0>(&frameworkInfo1),
+                    SaveArg<1>(&executorInfo1),
+                    SaveArg<2>(&task_1),
+                    SaveArg<3>(&taskGroup1),
+                    SaveArg<4>(&resourceVersionUuids1),
+                    SaveArg<5>(&launchExecutor1),
+                    Return(Nothing())))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
-                    SaveArg<0>(&unschedules2),
-                    SaveArg<1>(&frameworkInfo2),
-                    SaveArg<2>(&executorInfo2),
-                    SaveArg<3>(&task_2),
-                    SaveArg<4>(&taskGroup2),
-                    SaveArg<5>(&resourceVersionUuids2),
-                    SaveArg<6>(&launchExecutor2)));
+                    SaveArg<0>(&frameworkInfo2),
+                    SaveArg<1>(&executorInfo2),
+                    SaveArg<2>(&task_2),
+                    SaveArg<3>(&taskGroup2),
+                    SaveArg<4>(&resourceVersionUuids2),
+                    SaveArg<5>(&launchExecutor2),
+                    Return(Nothing())));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4306,23 +4310,25 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   AWAIT_READY(removeFramework);
 
   // The `__run` continuations should have no effect.
-  slave.get()->mock()->unmocked__run(
-      unschedules1,
-      frameworkInfo1,
-      executorInfo1,
-      task_1,
-      taskGroup1,
-      resourceVersionUuids1,
-      launchExecutor1);
-
-  slave.get()->mock()->unmocked__run(
-      unschedules2,
-      frameworkInfo2,
-      executorInfo2,
-      task_2,
-      taskGroup2,
-      resourceVersionUuids2,
-      launchExecutor2);
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo1,
+        executorInfo1,
+        task_1,
+        taskGroup1,
+        resourceVersionUuids1,
+        launchExecutor1);
+  });
+
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo2,
+        executorInfo2,
+        task_2,
+        taskGroup2,
+        resourceVersionUuids2,
+        launchExecutor2);
+  });
 
   Clock::settle();
 
@@ -7200,7 +7206,6 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
                      &MockSlave::unmocked_runTaskGroup));
 
   // Saved arguments from `Slave::_run()`.
-  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo_;
   Option<TaskGroupInfo> taskGroup_;
@@ -7212,15 +7217,15 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   // later, till reaching the critical moment when to kill the task
   // in the future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&unschedules),
-                    SaveArg<1>(&frameworkInfo),
-                    SaveArg<2>(&executorInfo_),
-                    SaveArg<3>(&task_),
-                    SaveArg<4>(&taskGroup_),
-                    SaveArg<5>(&resourceVersionUuids),
-                    SaveArg<6>(&launchExecutor)));
+                    SaveArg<0>(&frameworkInfo),
+                    SaveArg<1>(&executorInfo_),
+                    SaveArg<2>(&task_),
+                    SaveArg<3>(&taskGroup_),
+                    SaveArg<4>(&resourceVersionUuids),
+                    SaveArg<5>(&launchExecutor),
+                    Return(Nothing())));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7280,14 +7285,17 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo_,
-      task_,
-      taskGroup_,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo,
+        executorInfo_,
+        task_,
+        taskGroup_,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);

Reply via email to