This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f81427b743789f1e01e9babeb5080f037319af50 Author: Greg Mann <[email protected]> AuthorDate: Tue Jul 16 12:21:11 2019 -0700 Added end-to-end tests for agent draining. This adds a minimal test for the DRAIN_AGENT scheduler call, including verification of master API results once an agent has been drained. A second test verifies the case where an agent is marked gone when draining is complete. Review: https://reviews.apache.org/r/71069/ --- include/mesos/v1/mesos.hpp | 1 + src/tests/api_tests.cpp | 336 +++++++++++++++++++++++++++++++++++++++++++++ src/v1/mesos.cpp | 6 + 3 files changed, 343 insertions(+) diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp index df67f64..d8304f1 100644 --- a/include/mesos/v1/mesos.hpp +++ b/include/mesos/v1/mesos.hpp @@ -52,6 +52,7 @@ bool operator==( const CSIPluginContainerInfo& right); bool operator==(const DiscoveryInfo& left, const DiscoveryInfo& right); +bool operator==(const DrainInfo& left, const DrainInfo& right); bool operator==(const Environment& left, const Environment& right); bool operator==(const ExecutorInfo& left, const ExecutorInfo& right); bool operator==(const FileInfo& left, const FileInfo& right); diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp index af1d215..561ff20 100644 --- a/src/tests/api_tests.cpp +++ b/src/tests/api_tests.cpp @@ -20,6 +20,7 @@ #include <mesos/http.hpp> +#include <mesos/v1/mesos.hpp> #include <mesos/v1/resources.hpp> #include <mesos/v1/resource_provider.hpp> @@ -5448,6 +5449,341 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable) } +// When an operator submits a DRAIN_AGENT call, the agent should kill all +// running tasks. +TEST_P(MasterAPITest, DrainAgent) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + + Future<v1::scheduler::Event::Update> killedUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_KILLED)))) + .WillOnce(DoAll( + FutureArg<1>(&killedUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + Future<Nothing> registrarApplyDrained; + EXPECT_CALL(*master.get()->registrar, apply(_)) + .WillOnce(DoDefault()) + .WillOnce(DoAll( + FutureSatisfy(®istrarApplyDrained), + Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply))); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + post(master.get()->pid, call, contentType); + } + + AWAIT_READY(killedUpdate); + AWAIT_READY(registrarApplyDrained); + + // Ensure that the update acknowledgement has been processed. + Clock::settle(); + + mesos::v1::DrainInfo drainInfo; + drainInfo.set_state(mesos::v1::DRAINED); + drainInfo.mutable_config()->set_mark_gone(false); + + // Ensure that the agent's drain info is reflected in the master's + // GET_AGENTS response. + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_AGENTS); + + Future<v1::master::Response> response = + post(master.get()->pid, call, contentType); + + AWAIT_READY(response); + ASSERT_TRUE(response->IsInitialized()); + ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type()); + ASSERT_EQ(response->get_agents().agents_size(), 1); + + const v1::master::Response::GetAgents::Agent& agent = + response->get_agents().agents(0); + + EXPECT_EQ(agent.deactivated(), true); + + EXPECT_EQ(agent.drain_info(), drainInfo); + } + + // Ensure that the agent's drain info is reflected in the master's + // '/state' response. + { + Future<process::http::Response> response = process::http::get( + master.get()->pid, + "state", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); + ASSERT_SOME(parse); + + Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( + "slaves[0].drain_info"); + + ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); + } + + // Ensure that the agent's drain info is reflected in the master's + // '/state-summary' response. + { + Future<process::http::Response> response = process::http::get( + master.get()->pid, + "state-summary", + None(), + createBasicAuthHeaders(DEFAULT_CREDENTIAL)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); + AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + + Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); + ASSERT_SOME(parse); + + Result<JSON::Object> stateDrainInfo = parse->find<JSON::Object>( + "slaves[0].drain_info"); + + ASSERT_SOME_EQ(JSON::protobuf(drainInfo), stateDrainInfo); + } +} + + +// When an operator submits a DRAIN_AGENT call with 'mark_gone == true', the +// agent should kill all running tasks and the master should mark the agent gone +// once terminal ACKs have been received. +TEST_P(MasterAPITest, DrainAgentMarkGone) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + slave::Flags agentFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(slave); + + Clock::advance(agentFlags.registration_backoff_factor); + + AWAIT_READY(slaveRegisteredMessage); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + auto mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + const v1::AgentID& agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + v1::TaskInfo taskInfo = + v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))) + .WillRepeatedly(Return()); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(runningUpdate); + + Future<v1::scheduler::Event::Update> goneUpdate; + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_GONE_BY_OPERATOR)))) + .WillOnce(DoAll( + FutureArg<1>(&goneUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + // When the terminal ACK is received by the master, the agent should be marked + // gone, which entails sending a `ShutdownMessage`. + Future<ShutdownMessage> shutdownMessage = + FUTURE_PROTOBUF(ShutdownMessage(), _, _); + + ContentType contentType = GetParam(); + + { + v1::master::Call::DrainAgent drainAgent; + drainAgent.mutable_agent_id()->CopyFrom(agentId); + drainAgent.set_mark_gone(true); + + v1::master::Call call; + call.set_type(v1::master::Call::DRAIN_AGENT); + call.mutable_drain_agent()->CopyFrom(drainAgent); + + post(master.get()->pid, call, contentType); + } + + AWAIT_READY(goneUpdate); + AWAIT_READY(shutdownMessage); +} + + class AgentAPITest : public MesosTest, public WithParamInterface<ContentType> diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp index be479e3..c2fb528 100644 --- a/src/v1/mesos.cpp +++ b/src/v1/mesos.cpp @@ -146,6 +146,12 @@ bool operator==( } +bool operator==(const DrainInfo& left, const DrainInfo& right) +{ + return google::protobuf::util::MessageDifferencer::Equals(left, right); +} + + bool operator==( const Environment::Variable& left, const Environment::Variable& right)
