This is an automated email from the ASF dual-hosted git repository.
josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 6fcbd99 Fixed agent draining when returning from unreachable state.
6fcbd99 is described below
commit 6fcbd99108b19c8dd1cf7a0f45a59d48d7aacd7e
Author: Joseph Wu <[email protected]>
AuthorDate: Mon Aug 12 18:36:59 2019 -0700
Fixed agent draining when returning from unreachable state.
This logic was missing from the initial implementation of agent
draining. When an agent became unreachable, and then reregistered
with the master, the master would not properly deactivate or drain
the agent.
This also fixes a potential problem with checking the agent
drain state too early in the case of pending operations.
Operations are not reported to the master until after the agent
reregisters, so agents with the RESOURCE_PROVIDER capability
cannot be considered DRAINED until after the first
UpdateSlaveMessage has arrived.
Review: https://reviews.apache.org/r/71275
---
src/master/master.cpp | 43 ++++++++++-
src/tests/api_tests.cpp | 191 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 231 insertions(+), 3 deletions(-)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 31c7c97..599f62d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7938,6 +7938,30 @@ void Master::__reregisterSlave(
addSlave(slave, std::move(completedFrameworks));
+ // If this agent was deactivated, make sure to deactivate it again,
+ // now that it has reregistered.
+ if (slaves.deactivated.contains(slaveInfo.id())) {
+ deactivate(slave);
+ }
+
+ // If this is a draining agent, send it the drain message.
+ // We do this regardless of the draining state (DRAINING or DRAINED),
+ // because the agent is expected to handle the message in either state.
+ if (slaves.draining.contains(slaveInfo.id())) {
+ DrainSlaveMessage message;
+ message.mutable_config()->CopyFrom(
+ slaves.draining.at(slaveInfo.id()).config());
+
+ send(slave->pid, message);
+
+ // NOTE: If the agent supports resource providers, the agent will not
+ // report pending operations until the agent's first UpdateSlaveMessage.
+ // Therefore, we cannot check the agent's drain state here.
+ if (!slaveCapabilities.resourceProvider) {
+ checkAndTransitionDrainingAgent(slave);
+ }
+ }
+
Duration pingTimeout =
flags.agent_ping_timeout * flags.max_agent_ping_timeouts;
MasterSlaveConnection connection;
@@ -8049,6 +8073,7 @@ void Master::___reregisterSlave(
const string& version = reregisterSlaveMessage.version();
const vector<SlaveInfo::Capability> agentCapabilities =
google::protobuf::convert(reregisterSlaveMessage.agent_capabilities());
+ protobuf::slave::Capabilities slaveCapabilities(agentCapabilities);
Option<UUID> resourceVersion;
if (reregisterSlaveMessage.has_resource_version_uuid()) {
@@ -8132,9 +8157,12 @@ void Master::___reregisterSlave(
send(slave->pid, message);
- // Check if the agent is already drained and transition it
- // appropriately if so.
- checkAndTransitionDrainingAgent(slave);
+ // NOTE: If the agent supports resource providers, the agent will not
+ // report pending operations until the agent's first UpdateSlaveMessage.
+ // Therefore, we cannot check the agent's drain state here.
+ if (!slaveCapabilities.resourceProvider) {
+ checkAndTransitionDrainingAgent(slave);
+ }
}
// Inform the agent of the new framework pids for its tasks, and
@@ -8457,6 +8485,11 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
LOG(INFO) << "Ignoring update on agent " << *slave
<< " as it reports no changes";
+ // NOTE: This is necessary to catch draining agents with the
+ // RESOURCE_PROVIDER capability, since the master will not know about
+ // any pending operations until the first UpdateSlaveMessage has been
+ // sent after reregistration.
+ checkAndTransitionDrainingAgent(slave);
return;
}
@@ -8823,6 +8856,10 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// NOTE: We don't need to rescind inverse offers here as they are unrelated
to
// oversubscription.
+
+ // Now that we have the agent's operations in master memory, we can check
+ // if the agent is drained and transition it appropriately if so.
+ checkAndTransitionDrainingAgent(slave);
}
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 36fb7b0..e202cd3 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -5820,6 +5820,197 @@ TEST_P(MasterAPITest, DrainAgentMarkGone)
}
+// When an operator submits a DRAIN_AGENT call with an agent that has gone
+// unreachable, the call should succeed, and the agent should be drained
+// if/when it returns to the cluster.
+TEST_P(MasterAPITest, DrainAgentUnreachable)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ slave::Flags agentFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(agentFlags.registration_backoff_factor);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+ frameworkInfo.add_capabilities()->set_type(
+ v1::FrameworkInfo::Capability::PARTITION_AWARE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ auto mesos = std::make_shared<v1::scheduler::TestMesos>(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
+ Try<v1::Resources> resources =
+ v1::Resources::parse("cpus:0.1;mem:64;disk:64");
+
+ ASSERT_SOME(resources);
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000));
+
+ testing::Sequence updateSequence;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
+ Future<v1::scheduler::Event::Update> unreachableUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate2;
+ Future<v1::scheduler::Event::Update> killedUpdate;
+
+ // Make absolutely sure the agent receives these two acknowledgements
+ // before forcing the agent offline.
+ Future<StatusUpdateAcknowledgementMessage> startingAck =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+ Future<StatusUpdateAcknowledgementMessage> runningAck =
+ FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _);
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&startingUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_UNREACHABLE))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&unreachableUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ // When the agent is brought back, we expect a TASK_RUNNING followed by
+ // a TASK_KILLED (due to draining).
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&runningUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ EXPECT_CALL(
+ *scheduler,
+ update(_, AllOf(
+ TaskStatusUpdateTaskIdEq(taskInfo.task_id()),
+ TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+ .InSequence(updateSequence)
+ .WillOnce(DoAll(
+ FutureArg<1>(&killedUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+ mesos->send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH({taskInfo})}));
+
+ AWAIT_READY(startingUpdate);
+ AWAIT_READY(startingAck);
+ AWAIT_READY(runningUpdate);
+ AWAIT_READY(runningAck);
+
+ // Simulate an agent crash, so that it disconnects from the master.
+ slave.get()->terminate();
+ slave->reset();
+
+ Clock::advance(masterFlags.agent_reregister_timeout);
+ AWAIT_READY(unreachableUpdate);
+
+ // Start draining the unreachable agent.
+ ContentType contentType = GetParam();
+
+ {
+ v1::master::Call::DrainAgent drainAgent;
+ drainAgent.mutable_agent_id()->CopyFrom(agentId);
+
+ v1::master::Call call;
+ call.set_type(v1::master::Call::DRAIN_AGENT);
+ call.mutable_drain_agent()->CopyFrom(drainAgent);
+
+ post(master.get()->pid, call, contentType);
+ }
+
+ // Bring the agent back.
+ Future<ReregisterExecutorMessage> reregisterExecutor =
+ FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ Future<DrainSlaveMessage> drainSlaveMesage =
+ FUTURE_PROTOBUF(DrainSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> recoveredSlave =
+ StartSlave(detector.get(), agentFlags);
+ ASSERT_SOME(recoveredSlave);
+
+ AWAIT_READY(reregisterExecutor);
+ Clock::advance(agentFlags.executor_reregistration_timeout);
+ Clock::settle();
+ Clock::advance(agentFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The agent should be told to drain once it reregisters.
+ AWAIT_READY(drainSlaveMesage);
+ AWAIT_READY(runningUpdate2);
+ AWAIT_READY(killedUpdate);
+}
+
+
class AgentAPITest
: public MesosTest,
public WithParamInterface<ContentType>