This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch 1.6.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit c6da50d10511a10111146b8d4bc563dc3ccee875 Author: Greg Mann <[email protected]> AuthorDate: Tue Apr 23 22:25:29 2019 -0700 Transitioned tasks when an unreachable agent is marked as gone. This patch updates the master code responsible for marking agents as gone to properly transition tasks on agents which were previously marked as unreachable. Review: https://reviews.apache.org/r/70519/ --- src/master/http.cpp | 10 +-- src/master/master.cpp | 100 +++++++++++++++++++++--- src/master/master.hpp | 2 +- src/tests/api_tests.cpp | 196 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 289 insertions(+), 19 deletions(-) diff --git a/src/master/http.cpp b/src/master/http.cpp index 0492b97..103b7f5 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -5225,15 +5225,7 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const << registrarResult.failure(); } - Slave* slave = master->slaves.registered.get(slaveId); - - // This can happen if the agent that is being marked as - // gone is not currently registered (unreachable/recovered). - if (slave == nullptr) { - return; - } - - master->markGone(slave, goneTime); + master->markGone(slaveId, goneTime); })); return gone.then([]() -> Future<Response> { diff --git a/src/master/master.cpp b/src/master/master.cpp index 3b58964..804de69 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -8881,20 +8881,102 @@ void Master::_markUnreachable( } -void Master::markGone(Slave* slave, const TimeInfo& goneTime) +void Master::markGone(const SlaveID& slaveId, const TimeInfo& goneTime) { - CHECK_NOTNULL(slave); - CHECK(slaves.markingGone.contains(slave->info.id())); - slaves.markingGone.erase(slave->info.id()); + CHECK(slaves.markingGone.contains(slaveId)); + + slaves.markingGone.erase(slaveId); + + slaves.gone[slaveId] = goneTime; + + const string message = "Agent has been marked gone"; + + Slave* slave = slaves.registered.get(slaveId); - slaves.gone[slave->id] = goneTime; + // If the `Slave` struct does not exist, then the agent + // must be either recovered or unreachable. + if (slave == nullptr) { + CHECK(slaves.recovered.contains(slaveId) || + slaves.unreachable.contains(slaveId)); + + // When a recovered agent is marked gone, we have no task metadata to use in + // order to send task status updates. We could retain this agent ID and send + // updates upon reregistration but do not currently do this. See MESOS-9739. + if (slaves.recovered.contains(slaveId)) { + return; + } + + slaves.unreachable.erase(slaveId); + + // TODO(vinod): Consider moving these tasks into `completedTasks` by + // transitioning them to a terminal state and sending status updates. + // But it's not clear what this state should be. If a framework + // reconciles these tasks after this point it would get `TASK_UNKNOWN` + // which seems appropriate but we don't keep tasks in this state in-memory. + if (slaves.unreachableTasks.contains(slaveId)) { + foreachkey (const FrameworkID& frameworkId, + slaves.unreachableTasks.at(slaveId)) { + Framework* framework = getFramework(frameworkId); + if (framework == nullptr) { + continue; + } + + TaskState newTaskState = TASK_GONE_BY_OPERATOR; + TaskStatus::Reason newTaskReason = + TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR; + + if (!framework->capabilities.partitionAware) { + newTaskState = TASK_LOST; + newTaskReason = TaskStatus::REASON_SLAVE_REMOVED; + } + + foreach (const TaskID& taskId, + slaves.unreachableTasks.at(slaveId).get(frameworkId)) { + if (framework->unreachableTasks.contains(taskId)) { + const Owned<Task>& task = framework->unreachableTasks.at(taskId); + + const StatusUpdate& update = protobuf::createStatusUpdate( + task->framework_id(), + task->slave_id(), + task->task_id(), + newTaskState, + TaskStatus::SOURCE_MASTER, + None(), + message, + newTaskReason, + (task->has_executor_id() + ? Option<ExecutorID>(task->executor_id()) + : None())); + + updateTask(task.get(), update); + + if (!framework->connected()) { + LOG(WARNING) << "Dropping update " << update + << " for disconnected " + << " framework " << frameworkId; + } else { + forward(update, UPID(), framework); + } + + // Move task from unreachable map to completed map. + framework->addCompletedTask(std::move(*task)); + framework->unreachableTasks.erase(taskId); + } + } + } + + slaves.unreachableTasks.erase(slaveId); + } + + return; + } // Shutdown the agent if it transitioned to gone. - ShutdownMessage message; - message.set_message("Agent has been marked gone"); - send(slave->pid, message); + ShutdownMessage shutdownMessage; + shutdownMessage.set_message(message); + send(slave->pid, shutdownMessage); - __removeSlave(slave, "Agent has been marked gone", None()); + __removeSlave(slave, message, None()); } diff --git a/src/master/master.hpp b/src/master/master.hpp index c9a3ce2..c72fd41 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -550,7 +550,7 @@ public: bool duringMasterFailover, const std::string& message); - void markGone(Slave* slave, const TimeInfo& goneTime); + void markGone(const SlaveID& slaveId, const TimeInfo& goneTime); void authenticate( const process::UPID& from, diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index d9d9e9e..8219b6b 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -101,6 +101,7 @@ using std::tuple; using std::vector; using testing::_; +using testing::AllOf; using testing::AtMost; using testing::DoAll; using testing::Eq; @@ -4674,6 +4675,201 @@ TEST_P(MasterAPITest, TaskUpdatesUponAgentGone) } +// This test verifies that the master correctly sends 'TASK_GONE_BY_OPERATOR' +// status updates and transitions unreachable tasks to completed when an +// unreachable agent which was running the tasks is marked as gone. +TEST_P(MasterAPITest, UnreachableAgentMarkedGone) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + // Launch a task on this agent so that agent removal will cause + // the master to look for the framework struct. + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + + Future<v1::scheduler::Event::Update> unreachableUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo), + TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE)))) + .WillOnce(FutureArg<1>(&unreachableUpdate)); + + EXPECT_CALL(*scheduler, failure(_, _)); + + // Detect pings from master to agent. + Future<process::Message> ping = FUTURE_MESSAGE( + Eq(PingSlaveMessage().GetTypeName()), _, _); + + // Drop all PONGs to simulate agent partition. + DROP_PROTOBUFS(PongSlaveMessage(), _, _); + + // Advance the clock to produce a ping. + Clock::advance(masterFlags.agent_ping_timeout); + + // Now advance through enough pings to mark the agent unreachable. + size_t pings = 0; + while (true) { + AWAIT_READY(ping); + pings++; + if (pings == masterFlags.max_agent_ping_timeouts) { + break; + } + ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _); + Clock::advance(masterFlags.agent_ping_timeout); + } + + Clock::advance(masterFlags.agent_ping_timeout); + + AWAIT_READY(unreachableUpdate); + + Future<v1::scheduler::Event::Update> goneUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo), + TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR)))) + .WillOnce(FutureArg<1>(&goneUpdate)); + + ContentType contentType = GetParam(); + + // Mark the agent as gone. This should result in the master sending + // a 'TASK_GONE_BY_OPERATOR' update for the running task. + { + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::MARK_AGENT_GONE); + + v1::master::Call::MarkAgentGone* markAgentGone = + v1Call.mutable_mark_agent_gone(); + + markAgentGone->mutable_agent_id()->CopyFrom(agentId); + + Future<http::Response> response = http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(contentType, v1Call), + stringify(contentType)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + } + + AWAIT_READY(goneUpdate); + + // GetState after agent is marked gone to ensure that the previously + // unreachable task has been moved to completed. + { + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::GET_STATE); + + Future<v1::master::Response> v1Response = + post(master.get()->pid, v1Call, contentType); + + AWAIT_READY(v1Response); + ASSERT_TRUE(v1Response->IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_STATE, v1Response->type()); + + const v1::master::Response::GetState& getState = v1Response->get_state(); + ASSERT_TRUE(getState.get_tasks().unreachable_tasks().empty()); + ASSERT_EQ(1, getState.get_tasks().completed_tasks_size()); + ASSERT_EQ( + taskInfo.task_id(), + getState.get_tasks().completed_tasks(0).task_id()); + } + + Clock::resume(); +} + + class AgentAPITest : public MesosTest, public WithParamInterface<ContentType>
