Repository: mesos
Updated Branches:
  refs/heads/1.5.x db71e2f62 -> d59109808


Prevented a crash when an agent with terminal tasks is partitioned.

If an agent is lost, we try to remove all the tasks that might have
been lost. If a task is already terminal but has unacknowleged status
updates, it is expected that we track it in the unreachable tasks list
so we should remove the CHECK that prevents this. This patch also
changes to how unreachable tasks are presented in the HTTP endpoints
so that terminal but unacknowleged tasks are shown in in the list of
unreachable tasks and not completed tasks, which is different than
1.4.x where they are shown as completed.

Review: https://reviews.apache.org/r/64940/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5910980
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5910980
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5910980

Branch: refs/heads/1.5.x
Commit: d59109808443ab2987fd0204d94f9a4e3e84dd9b
Parents: db71e2f
Author: James Peach <jpe...@apache.org>
Authored: Fri Jan 12 13:46:27 2018 -0800
Committer: Jiang Yan Xu <xuj...@apple.com>
Committed: Fri Jan 12 14:53:45 2018 -0800

----------------------------------------------------------------------
 src/master/http.cpp           |  31 +------
 src/master/master.cpp         |   2 -
 src/master/master.hpp         |   2 -
 src/tests/mesos.hpp           |  15 +++-
 src/tests/partition_tests.cpp | 168 +++++++++++++++++++++++++++++++++++--
 5 files changed, 173 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d5910980/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index bc29faf..0c2cd55 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -328,12 +328,6 @@ struct FullFrameworkWriter {
 
     writer->field("unreachable_tasks", [this](JSON::ArrayWriter* writer) {
       foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
-        // There could be TASK_LOST tasks in this map. See comment for
-        // `unreachableTasks`.
-        if (task->state() != TASK_UNREACHABLE) {
-          continue;
-        }
-
         // Skip unauthorized tasks.
         if (!authorizeTask_->accept(*task, framework_->info)) {
           continue;
@@ -352,22 +346,6 @@ struct FullFrameworkWriter {
 
         writer->element(*task);
       }
-
-      // Unreachable tasks belonging to a non-partition-aware framework
-      // have been stored as TASK_LOST for backward compatibility so we
-      // should export them as completed tasks.
-      foreachvalue (const Owned<Task>& task, framework_->unreachableTasks) {
-        if (task->state() != TASK_LOST) {
-          continue;
-        }
-
-        // Skip unauthorized tasks.
-        if (!authorizeTask_->accept(*task, framework_->info)) {
-          continue;
-        }
-
-        writer->element(*task);
-      }
     });
 
     // Model all of the offers associated with a framework.
@@ -4251,14 +4229,7 @@ mesos::master::Response::GetTasks 
Master::Http::_getTasks(
         continue;
       }
 
-      if (task->state() == TASK_UNREACHABLE) {
-        getTasks.add_unreachable_tasks()->CopyFrom(*task);
-      } else {
-        // Unreachable tasks belonging to a non-partition-aware framework
-        // have been stored as TASK_LOST for backward compatibility so we
-        // should export them as completed tasks.
-        getTasks.add_completed_tasks()->CopyFrom(*task);
-      }
+      getTasks.add_unreachable_tasks()->CopyFrom(*task);
     }
 
     // Completed tasks.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5910980/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8050844..a6be5a9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10232,8 +10232,6 @@ void Master::removeTask(Task* task, bool unreachable)
   const Resources resources = task->resources();
 
   if (!isRemovable(task->state())) {
-    CHECK(!unreachable) << task->task_id();
-
     // Note that we use `Resources` for output as it's faster than
     // logging raw protobuf data.
     LOG(WARNING) << "Removing task " << task->task_id()

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5910980/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 130f6e2..b1b4e10 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2332,8 +2332,6 @@ struct Framework
     }
 
     if (unreachable) {
-      CHECK(task->state() == TASK_UNREACHABLE || task->state() == TASK_LOST)
-        << task->state();
       addUnreachableTask(*task);
     } else {
       addCompletedTask(Task(*task));

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5910980/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 93913f2..16c75bb 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3494,21 +3494,30 @@ MATCHER_P(OffersHaveResource, resource, "")
 }
 
 
-// This matcher is used to match the task id of `TaskStatus` message.
+// This matcher is used to match the task id of a `TaskStatus` message.
 MATCHER_P(TaskStatusTaskIdEq, taskInfo, "")
 {
   return arg.task_id() == taskInfo.task_id();
 }
 
 
-// This matcher is used to match the task id of `Event.update.status` message.
+// This matcher is used to match the state of a `TaskStatus` message.
+MATCHER_P(TaskStatusStateEq, taskState, "")
+{
+  return arg.state() == taskState;
+}
+
+
+// This matcher is used to match the task id of an `Event.update.status`
+// message.
 MATCHER_P(TaskStatusUpdateTaskIdEq, taskInfo, "")
 {
   return arg.status().task_id() == taskInfo.task_id();
 }
 
 
-// This matcher is used to match the state of `Event.update.status` message.
+// This matcher is used to match the state of an `Event.update.status`
+// message.
 MATCHER_P(TaskStatusUpdateStateEq, taskState, "")
 {
   return arg.status().state() == taskState;

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5910980/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 7f4b9ed..e7f4ebe 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -69,6 +69,7 @@ using process::http::Response;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
@@ -678,22 +679,30 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, 
ReregisterSlaveNotPartitionAware)
 
     EXPECT_TRUE(runningTasks.values.empty());
 
+    // Although the task state based metrics above show the task in
+    // master/tasks_lost, the state endpoint lists the task in the
+    // "unreachable_tasks" section because it is indeed unreachable
+    // and shouldn't be considered "completed". This difference is
+    // unfortunate and should be addressed in MESOS-8405.
     JSON::Array unreachableTasks =
       framework.values["unreachable_tasks"].as<JSON::Array>();
 
-    EXPECT_TRUE(unreachableTasks.values.empty());
+    EXPECT_FALSE(unreachableTasks.values.empty());
 
-    JSON::Array completedTasks =
-      framework.values["completed_tasks"].as<JSON::Array>();
-
-    JSON::Object completedTask =
-      completedTasks.values.front().as<JSON::Object>();
+    JSON::Object unreachableTask =
+      unreachableTasks.values.front().as<JSON::Object>();
 
+    // The unreachable task is in its backwards-compatible state.
     EXPECT_EQ(
-        task.task_id(), completedTask.values["id"].as<JSON::String>().value);
+        task.task_id(), unreachableTask.values["id"].as<JSON::String>().value);
     EXPECT_EQ(
         "TASK_LOST",
-        completedTask.values["state"].as<JSON::String>().value);
+        unreachableTask.values["state"].as<JSON::String>().value);
+
+    JSON::Array completedTasks =
+      framework.values["completed_tasks"].as<JSON::Array>();
+
+    EXPECT_TRUE(completedTasks.values.empty());
   }
 
   // We now complete the partition on the slave side as well. We
@@ -2396,6 +2405,149 @@ TEST_F(PartitionTest, 
PartitionAwareTaskCompletedOnPartitionedAgent)
 }
 
 
+// This test verifies that when the master removes a lost agent any
+// unacknowledged but terminal tasks on the agent are tracked as
+// unreachable but keep their original terminal state.
+TEST_F(PartitionTest, AgentWithTerminalTaskPartitioned)
+{
+  // Start a master.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  // We require the completed task to still be tracked on the master when
+  // we lose the agent. By disabling scheduler implicit acknowledgements
+  // and controlling the acknowledgements manually, we ensure that the
+  // master is forced to retain the task state.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched,
+      DEFAULT_FRAMEWORK_INFO,
+      master.get()->pid,
+      false,
+      DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillRepeatedly(Return());
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  testing::Sequence taskSequence;
+  Future<TaskStatus> starting;
+  Future<TaskStatus> running;
+  Future<TaskStatus> finished;
+
+  TaskInfo task = createTask(offers->at(0), SLEEP_COMMAND(0));
+
+  EXPECT_CALL(
+      sched,
+      statusUpdate(&driver, AllOf(
+          TaskStatusTaskIdEq(task),
+          TaskStatusStateEq(TASK_STARTING))))
+    .InSequence(taskSequence)
+    .WillOnce(FutureArg<1>(&starting));
+
+  EXPECT_CALL(
+      sched,
+      statusUpdate(&driver, AllOf(
+          TaskStatusTaskIdEq(task),
+          TaskStatusStateEq(TASK_RUNNING))))
+    .InSequence(taskSequence)
+    .WillOnce(FutureArg<1>(&running));
+
+  EXPECT_CALL(
+      sched,
+      statusUpdate(&driver, AllOf(
+          TaskStatusTaskIdEq(task),
+          TaskStatusStateEq(TASK_FINISHED))))
+    .InSequence(taskSequence)
+    .WillOnce(FutureArg<1>(&finished))
+    // Ignore additional status update delivery attempts.
+    .WillRepeatedly(Return());
+
+  Clock::pause();
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(starting);
+  driver.acknowledgeStatusUpdate(starting.get());
+
+  AWAIT_READY(running);
+  driver.acknowledgeStatusUpdate(running.get());
+
+  // Wait for the task to finish but don't acknowledge the status
+  // update. This ensures that the master is still tracking the task
+  // when the agent becomes lost.
+  AWAIT_READY(finished);
+
+  // When the agent is lost, the master will rescind any offers and we
+  // can just ignore that.
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .Times(1);
+
+  Future<Nothing> unreachable =
+    FUTURE_DISPATCH(_, &master::Master::markUnreachable);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  // Now fast forward through the ping timeouts so that the agent
+  // becomes unreachable on the master.
+  for (unsigned i = 0; i <= masterFlags.max_agent_ping_timeouts; ++i) {
+    Future<PingSlaveMessage> ping = FUTURE_PROTOBUF(PingSlaveMessage(), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+    AWAIT_READY(ping);
+  }
+
+  AWAIT_READY(unreachable);
+
+  Clock::settle();
+  Clock::resume();
+
+  Future<Response> response = process::http::get(
+      master.get()->pid,
+      "frameworks",
+      None(),
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  Try<JSON::Object> json = JSON::parse<JSON::Object>(response->body);
+  ASSERT_SOME(json);
+
+  // We should not have any active tasks.
+  EXPECT_NONE(json->find<JSON::Number>("frameworks[0].tasks[0]"));
+
+  // The task we launched should be unreachable (since the final status
+  // update wasn't acknowledged) but still terminal. Note that these
+  // expectations are likely to change as part of MESOS-8405.
+  EXPECT_SOME_EQ(
+      task.task_id().value(),
+      json->find<JSON::String>("frameworks[0].unreachable_tasks[0].id"));
+  EXPECT_SOME_EQ(
+      "TASK_FINISHED",
+      json->find<JSON::String>("frameworks[0].unreachable_tasks[0].state"));
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test checks that the master correctly garbage collects
 // information about unreachable agents from the registry using the
 // count-based GC criterion.

Reply via email to