This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 4f078398f7010d982a1c4ee95a1e3f628813e6fe Author: Joseph Wu <[email protected]> AuthorDate: Mon Jul 29 19:43:31 2019 -0700 Refactored master draining test setup. Tests of this feature will generally require a master, agent, framework, and a single task to be launched at the beginning of the test. This moves this common code into the test SetUp. This also changes the `post(...)` helper to return the http::Response object instead of parsing it. The response for DRAIN_AGENT calls does not return an object, so the tests were not checking the response before. Review: https://reviews.apache.org/r/71315 --- src/tests/master_draining_tests.cpp | 494 +++++++++++++----------------------- 1 file changed, 175 insertions(+), 319 deletions(-) diff --git a/src/tests/master_draining_tests.cpp b/src/tests/master_draining_tests.cpp index 16d0c85..eae809f 100644 --- a/src/tests/master_draining_tests.cpp +++ b/src/tests/master_draining_tests.cpp @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include <memory> #include <string> #include <mesos/http.hpp> @@ -73,6 +74,130 @@ class MasterDrainingTest public WithParamInterface<ContentType> { public: + // Creates a master, agent, framework, and launches one sleep task. + void SetUp() override + { + MesosTest::SetUp(); + + Clock::pause(); + + // Create the master. + masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> _master = StartMaster(masterFlags); + ASSERT_SOME(_master); + master = _master.get(); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + // Create the agent. + agentFlags = CreateSlaveFlags(); + detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> _slave = StartSlave(detector.get(), agentFlags); + ASSERT_SOME(_slave); + slave = _slave.get(); + + Clock::advance(agentFlags.registration_backoff_factor); + AWAIT_READY(slaveRegisteredMessage); + + // Create the framework. + scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_checkpoint(true); + frameworkInfo.add_capabilities()->set_type( + v1::FrameworkInfo::Capability::PARTITION_AWARE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + mesos = std::make_shared<v1::scheduler::TestMesos>( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + frameworkId = subscribed->framework_id(); + + // Launch a sleep task. + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + agentId = offer.agent_id(); + + Try<v1::Resources> resources = + v1::Resources::parse("cpus:0.1;mem:64;disk:64"); + + ASSERT_SOME(resources); + + taskInfo = v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); + + testing::Sequence updateSequence; + Future<v1::scheduler::Event::Update> startingUpdate; + Future<v1::scheduler::Event::Update> runningUpdate; + + // Make sure the agent receives these two acknowledgements. + Future<StatusUpdateAcknowledgementMessage> startingAck = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + Future<StatusUpdateAcknowledgementMessage> runningAck = + FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_STARTING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&startingUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + EXPECT_CALL( + *scheduler, + update(_, AllOf( + TaskStatusUpdateTaskIdEq(taskInfo.task_id()), + TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) + .InSequence(updateSequence) + .WillOnce(DoAll( + FutureArg<1>(&runningUpdate), + v1::scheduler::SendAcknowledge(frameworkId, agentId))); + + mesos->send( + v1::createCallAccept( + frameworkId, + offer, + {v1::LAUNCH({taskInfo})})); + + AWAIT_READY(startingUpdate); + AWAIT_READY(startingAck); + AWAIT_READY(runningUpdate); + AWAIT_READY(runningAck); + } + + void TearDown() override + { + mesos.reset(); + scheduler.reset(); + slave.reset(); + detector.reset(); + master.reset(); + + Clock::resume(); + + MesosTest::TearDown(); + } + master::Flags CreateMasterFlags() override { // Turn off periodic allocations to avoid the race between @@ -84,13 +209,12 @@ public: // Helper function to post a request to "/api/v1" master endpoint and return // the response. - Future<v1::master::Response> post( + Future<http::Response> post( const process::PID<master::Master>& pid, const v1::master::Call& call, - const ContentType& contentType, - const Credential& credential = DEFAULT_CREDENTIAL) + const ContentType& contentType) { - http::Headers headers = createBasicAuthHeaders(credential); + http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); headers["Accept"] = stringify(contentType); return http::post( @@ -98,15 +222,24 @@ public: "api/v1", headers, serialize(contentType, call), - stringify(contentType)) - .then([contentType](const http::Response& response) - -> Future<v1::master::Response> { - if (response.status != http::OK().status) { - return Failure("Unexpected response status " + response.status); - } - return deserialize<v1::master::Response>(contentType, response.body); - }); + stringify(contentType)); } + +protected: + master::Flags masterFlags; + Owned<cluster::Master> master; + Owned<MasterDetector> detector; + + slave::Flags agentFlags; + Owned<cluster::Slave> slave; + v1::AgentID agentId; + + std::shared_ptr<v1::MockHTTPScheduler> scheduler; + v1::FrameworkInfo frameworkInfo; + std::shared_ptr<v1::scheduler::TestMesos> mesos; + v1::FrameworkID frameworkId; + + v1::TaskInfo taskInfo; }; @@ -121,99 +254,6 @@ INSTANTIATE_TEST_CASE_P( // running tasks. TEST_P(MasterDrainingTest, DrainAgent) { - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) - .WillRepeatedly(Return()); - - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(runningUpdate); - Future<v1::scheduler::Event::Update> killedUpdate; EXPECT_CALL( *scheduler, @@ -225,11 +265,11 @@ TEST_P(MasterDrainingTest, DrainAgent) v1::scheduler::SendAcknowledge(frameworkId, agentId))); Future<Nothing> registrarApplyDrained; - EXPECT_CALL(*master.get()->registrar, apply(_)) + EXPECT_CALL(*master->registrar, apply(_)) .WillOnce(DoDefault()) .WillOnce(DoAll( FutureSatisfy(®istrarApplyDrained), - Invoke(master.get()->registrar.get(), &MockRegistrar::unmocked_apply))); + Invoke(master->registrar.get(), &MockRegistrar::unmocked_apply))); ContentType contentType = GetParam(); @@ -242,7 +282,9 @@ TEST_P(MasterDrainingTest, DrainAgent) call.set_type(v1::master::Call::DRAIN_AGENT); call.mutable_drain_agent()->CopyFrom(drainAgent); - post(master.get()->pid, call, contentType); + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); } AWAIT_READY(killedUpdate); @@ -262,16 +304,19 @@ TEST_P(MasterDrainingTest, DrainAgent) v1::master::Call call; call.set_type(v1::master::Call::GET_AGENTS); - Future<v1::master::Response> response = - post(master.get()->pid, call, contentType); + Future<http::Response> response = + post(master->pid, call, contentType); + AWAIT_ASSERT_RESPONSE_STATUS_EQ(http::OK().status, response); - AWAIT_READY(response); - ASSERT_TRUE(response->IsInitialized()); - ASSERT_EQ(v1::master::Response::GET_AGENTS, response->type()); - ASSERT_EQ(response->get_agents().agents_size(), 1); + Try<v1::master::Response> getAgents = + deserialize<v1::master::Response>(contentType, response->body); + ASSERT_SOME(getAgents); + + ASSERT_EQ(v1::master::Response::GET_AGENTS, getAgents->type()); + ASSERT_EQ(getAgents->get_agents().agents_size(), 1); const v1::master::Response::GetAgents::Agent& agent = - response->get_agents().agents(0); + getAgents->get_agents().agents(0); EXPECT_EQ(agent.deactivated(), true); @@ -283,13 +328,13 @@ TEST_P(MasterDrainingTest, DrainAgent) // '/state' response. { Future<process::http::Response> response = process::http::get( - master.get()->pid, + master->pid, "state", None(), createBasicAuthHeaders(DEFAULT_CREDENTIAL)); - AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response); - AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); + AWAIT_ASSERT_RESPONSE_STATUS_EQ(process::http::OK().status, response); + AWAIT_ASSERT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response); Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body); ASSERT_SOME(parse); @@ -310,7 +355,7 @@ TEST_P(MasterDrainingTest, DrainAgent) // '/state-summary' response. { Future<process::http::Response> response = process::http::get( - master.get()->pid, + master->pid, "state-summary", None(), createBasicAuthHeaders(DEFAULT_CREDENTIAL)); @@ -340,99 +385,6 @@ TEST_P(MasterDrainingTest, DrainAgent) // once terminal ACKs have been received. TEST_P(MasterDrainingTest, DrainAgentMarkGone) { - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))) - .WillRepeatedly(Return()); - - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(runningUpdate); - Future<v1::scheduler::Event::Update> goneUpdate; EXPECT_CALL( *scheduler, @@ -459,7 +411,9 @@ TEST_P(MasterDrainingTest, DrainAgentMarkGone) call.set_type(v1::master::Call::DRAIN_AGENT); call.mutable_drain_agent()->CopyFrom(drainAgent); - post(master.get()->pid, call, contentType); + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); } AWAIT_READY(goneUpdate); @@ -472,100 +426,11 @@ TEST_P(MasterDrainingTest, DrainAgentMarkGone) // if/when it returns to the cluster. TEST_P(MasterDrainingTest, DrainAgentUnreachable) { - Clock::pause(); - - master::Flags masterFlags = CreateMasterFlags(); - Try<Owned<cluster::Master>> master = StartMaster(masterFlags); - ASSERT_SOME(master); - - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); - - slave::Flags agentFlags = CreateSlaveFlags(); - Owned<MasterDetector> detector = master.get()->createDetector(); - Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), agentFlags); - ASSERT_SOME(slave); - - Clock::advance(agentFlags.registration_backoff_factor); - - AWAIT_READY(slaveRegisteredMessage); - - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_checkpoint(true); - frameworkInfo.add_capabilities()->set_type( - v1::FrameworkInfo::Capability::PARTITION_AWARE); - - EXPECT_CALL(*scheduler, connected(_)) - .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); - - Future<v1::scheduler::Event::Subscribed> subscribed; - EXPECT_CALL(*scheduler, subscribed(_, _)) - .WillOnce(FutureArg<1>(&subscribed)); - - EXPECT_CALL(*scheduler, heartbeat(_)) - .WillRepeatedly(Return()); // Ignore heartbeats. - - Future<v1::scheduler::Event::Offers> offers; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers)) - .WillRepeatedly(Return()); - - auto mesos = std::make_shared<v1::scheduler::TestMesos>( - master.get()->pid, ContentType::PROTOBUF, scheduler); - - AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - - AWAIT_READY(offers); - ASSERT_FALSE(offers->offers().empty()); - - const v1::Offer& offer = offers->offers(0); - const v1::AgentID& agentId = offer.agent_id(); - - Try<v1::Resources> resources = - v1::Resources::parse("cpus:0.1;mem:64;disk:64"); - - ASSERT_SOME(resources); - - v1::TaskInfo taskInfo = - v1::createTask(agentId, resources.get(), SLEEP_COMMAND(1000)); - testing::Sequence updateSequence; - Future<v1::scheduler::Event::Update> startingUpdate; - Future<v1::scheduler::Event::Update> runningUpdate; Future<v1::scheduler::Event::Update> unreachableUpdate; - Future<v1::scheduler::Event::Update> runningUpdate2; + Future<v1::scheduler::Event::Update> runningUpdate; Future<v1::scheduler::Event::Update> killedUpdate; - // Make absolutely sure the agent receives these two acknowledgements - // before forcing the agent offline. - Future<StatusUpdateAcknowledgementMessage> startingAck = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); - Future<StatusUpdateAcknowledgementMessage> runningAck = - FUTURE_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, _); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_STARTING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&startingUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - - EXPECT_CALL( - *scheduler, - update(_, AllOf( - TaskStatusUpdateTaskIdEq(taskInfo.task_id()), - TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) - .InSequence(updateSequence) - .WillOnce(DoAll( - FutureArg<1>(&runningUpdate), - v1::scheduler::SendAcknowledge(frameworkId, agentId))); - EXPECT_CALL( *scheduler, update(_, AllOf( @@ -585,7 +450,7 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable) TaskStatusUpdateStateEq(v1::TASK_RUNNING)))) .InSequence(updateSequence) .WillOnce(DoAll( - FutureArg<1>(&runningUpdate2), + FutureArg<1>(&runningUpdate), v1::scheduler::SendAcknowledge(frameworkId, agentId))); EXPECT_CALL( @@ -598,20 +463,9 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable) FutureArg<1>(&killedUpdate), v1::scheduler::SendAcknowledge(frameworkId, agentId))); - mesos->send( - v1::createCallAccept( - frameworkId, - offer, - {v1::LAUNCH({taskInfo})})); - - AWAIT_READY(startingUpdate); - AWAIT_READY(startingAck); - AWAIT_READY(runningUpdate); - AWAIT_READY(runningAck); - // Simulate an agent crash, so that it disconnects from the master. - slave.get()->terminate(); - slave->reset(); + slave->terminate(); + slave.reset(); Clock::advance(masterFlags.agent_reregister_timeout); AWAIT_READY(unreachableUpdate); @@ -627,7 +481,9 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable) call.set_type(v1::master::Call::DRAIN_AGENT); call.mutable_drain_agent()->CopyFrom(drainAgent); - post(master.get()->pid, call, contentType); + AWAIT_EXPECT_RESPONSE_STATUS_EQ( + http::OK().status, + post(master->pid, call, contentType)); } // Bring the agent back. @@ -653,7 +509,7 @@ TEST_P(MasterDrainingTest, DrainAgentUnreachable) // The agent should be told to drain once it reregisters. AWAIT_READY(drainSlaveMesage); - AWAIT_READY(runningUpdate2); + AWAIT_READY(runningUpdate); AWAIT_READY(killedUpdate); }
