This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 61f1155675bd3bc5312e0501ea6182d2ee7434af
Author: Greg Mann <[email protected]>
AuthorDate: Tue Apr 23 22:25:29 2019 -0700

    Transitioned tasks when an unreachable agent is marked as gone.
    
    This patch updates the master code responsible for marking
    agents as gone to properly transition tasks on agents which
    were previously marked as unreachable.
    
    Review: https://reviews.apache.org/r/70519/
---
 src/master/http.cpp     |  10 +--
 src/master/master.cpp   | 100 +++++++++++++++++++++---
 src/master/master.hpp   |   2 +-
 src/tests/api_tests.cpp | 196 ++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 289 insertions(+), 19 deletions(-)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index e2773ed..30dddc1 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -5331,15 +5331,7 @@ Future<Response> Master::Http::_markAgentGone(const 
SlaveID& slaveId) const
                  << registrarResult.failure();
     }
 
-    Slave* slave = master->slaves.registered.get(slaveId);
-
-    // This can happen if the agent that is being marked as
-    // gone is not currently registered (unreachable/recovered).
-    if (slave == nullptr) {
-      return;
-    }
-
-    master->markGone(slave, goneTime);
+    master->markGone(slaveId, goneTime);
   }));
 
   return gone.then([]() -> Future<Response> {
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 08a5133..1a95b69 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8968,20 +8968,102 @@ void Master::_markUnreachable(
 }
 
 
-void Master::markGone(Slave* slave, const TimeInfo& goneTime)
+void Master::markGone(const SlaveID& slaveId, const TimeInfo& goneTime)
 {
-  CHECK_NOTNULL(slave);
-  CHECK(slaves.markingGone.contains(slave->info.id()));
-  slaves.markingGone.erase(slave->info.id());
+  CHECK(slaves.markingGone.contains(slaveId));
+
+  slaves.markingGone.erase(slaveId);
+
+  slaves.gone[slaveId] = goneTime;
+
+  const string message = "Agent has been marked gone";
+
+  Slave* slave = slaves.registered.get(slaveId);
 
-  slaves.gone[slave->id] = goneTime;
+  // If the `Slave` struct does not exist, then the agent
+  // must be either recovered or unreachable.
+  if (slave == nullptr) {
+    CHECK(slaves.recovered.contains(slaveId) ||
+          slaves.unreachable.contains(slaveId));
+
+    // When a recovered agent is marked gone, we have no task metadata to use 
in
+    // order to send task status updates. We could retain this agent ID and 
send
+    // updates upon reregistration but do not currently do this. See 
MESOS-9739.
+    if (slaves.recovered.contains(slaveId)) {
+      return;
+    }
+
+    slaves.unreachable.erase(slaveId);
+
+    // TODO(vinod): Consider moving these tasks into `completedTasks` by
+    // transitioning them to a terminal state and sending status updates.
+    // But it's not clear what this state should be. If a framework
+    // reconciles these tasks after this point it would get `TASK_UNKNOWN`
+    // which seems appropriate but we don't keep tasks in this state in-memory.
+    if (slaves.unreachableTasks.contains(slaveId)) {
+      foreachkey (const FrameworkID& frameworkId,
+                  slaves.unreachableTasks.at(slaveId)) {
+        Framework* framework = getFramework(frameworkId);
+        if (framework == nullptr) {
+          continue;
+        }
+
+        TaskState newTaskState = TASK_GONE_BY_OPERATOR;
+        TaskStatus::Reason newTaskReason =
+          TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
+
+        if (!framework->capabilities.partitionAware) {
+          newTaskState = TASK_LOST;
+          newTaskReason = TaskStatus::REASON_SLAVE_REMOVED;
+        }
+
+        foreach (const TaskID& taskId,
+                 slaves.unreachableTasks.at(slaveId).get(frameworkId)) {
+          if (framework->unreachableTasks.contains(taskId)) {
+            const Owned<Task>& task = framework->unreachableTasks.at(taskId);
+
+            const StatusUpdate& update = protobuf::createStatusUpdate(
+                task->framework_id(),
+                task->slave_id(),
+                task->task_id(),
+                newTaskState,
+                TaskStatus::SOURCE_MASTER,
+                None(),
+                message,
+                newTaskReason,
+                (task->has_executor_id()
+                   ? Option<ExecutorID>(task->executor_id())
+                   : None()));
+
+            updateTask(task.get(), update);
+
+            if (!framework->connected()) {
+              LOG(WARNING) << "Dropping update " << update
+                           << " for disconnected "
+                           << " framework " << frameworkId;
+            } else {
+              forward(update, UPID(), framework);
+            }
+
+            // Move task from unreachable map to completed map.
+            framework->addCompletedTask(std::move(*task));
+            framework->unreachableTasks.erase(taskId);
+          }
+        }
+      }
+
+      slaves.unreachableTasks.erase(slaveId);
+    }
+
+    return;
+  }
 
   // Shutdown the agent if it transitioned to gone.
-  ShutdownMessage message;
-  message.set_message("Agent has been marked gone");
-  send(slave->pid, message);
+  ShutdownMessage shutdownMessage;
+  shutdownMessage.set_message(message);
+  send(slave->pid, shutdownMessage);
 
-  __removeSlave(slave, "Agent has been marked gone", None());
+  __removeSlave(slave, message, None());
 }
 
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6830e3b..b5d82ee 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -564,7 +564,7 @@ public:
       bool duringMasterFailover,
       const std::string& message);
 
-  void markGone(Slave* slave, const TimeInfo& goneTime);
+  void markGone(const SlaveID& slaveId, const TimeInfo& goneTime);
 
   void authenticate(
       const process::UPID& from,
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 0cfc8e3..d3ac3ba 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -101,6 +101,7 @@ using std::tuple;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::AtMost;
 using testing::DoAll;
 using testing::Eq;
@@ -4717,6 +4718,201 @@ TEST_P(MasterAPITest, TaskUpdatesUponAgentGone)
 }
 
 
+// This test verifies that the master correctly sends 'TASK_GONE_BY_OPERATOR'
+// status updates and transitions unreachable tasks to completed when an
+// unreachable agent which was running the tasks is marked as gone.
+TEST_P(MasterAPITest, UnreachableAgentMarkedGone)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  // Launch a task on this agent so that agent removal will cause
+  // the master to look for the framework struct.
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  Try<v1::Resources> resources =
+    v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+  ASSERT_SOME(resources);
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+  testing::Sequence updateSequence;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&startingUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(updateSequence)
+    .WillOnce(DoAll(
+        FutureArg<1>(&runningUpdate),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return());
+
+  mesos->send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH({taskInfo})}));
+
+  AWAIT_READY(startingUpdate);
+  AWAIT_READY(runningUpdate);
+
+  Future<v1::scheduler::Event::Update> unreachableUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+    .WillOnce(FutureArg<1>(&unreachableUpdate));
+
+  EXPECT_CALL(*scheduler, failure(_, _));
+
+  // Detect pings from master to agent.
+  Future<process::Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  // Drop all PONGs to simulate agent partition.
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  // Advance the clock to produce a ping.
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  // Now advance through enough pings to mark the agent unreachable.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(unreachableUpdate);
+
+  Future<v1::scheduler::Event::Update> goneUpdate;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+            TaskStatusUpdateTaskIdEq(taskInfo),
+            TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR))))
+    .WillOnce(FutureArg<1>(&goneUpdate));
+
+  ContentType contentType = GetParam();
+
+  // Mark the agent as gone. This should result in the master sending
+  // a 'TASK_GONE_BY_OPERATOR' update for the running task.
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+    v1::master::Call::MarkAgentGone* markAgentGone =
+      v1Call.mutable_mark_agent_gone();
+
+    markAgentGone->mutable_agent_id()->CopyFrom(agentId);
+
+    Future<http::Response> response = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, v1Call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  AWAIT_READY(goneUpdate);
+
+  // GetState after agent is marked gone to ensure that the previously
+  // unreachable task has been moved to completed.
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::GET_STATE);
+
+    Future<v1::master::Response> v1Response =
+      post(master.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type());
+
+    const v1::master::Response::GetState& getState = v1Response->get_state();
+    ASSERT_TRUE(getState.get_tasks().unreachable_tasks().empty());
+    ASSERT_EQ(1, getState.get_tasks().completed_tasks_size());
+    ASSERT_EQ(
+        taskInfo.task_id(),
+        getState.get_tasks().completed_tasks(0).task_id());
+  }
+
+  Clock::resume();
+}
+
+
 class AgentAPITest
   : public MesosTest,
     public WithParamInterface<ContentType>

Reply via email to