Tested correct operation handling during master failover. Review: https://reviews.apache.org/r/65045/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c9184a0 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c9184a0 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c9184a0 Branch: refs/heads/master Commit: 8c9184a03fa6b6fe842eb3554220d3ed2c327cdc Parents: c6de89e Author: Jan Schlicht <j...@mesosphere.io> Authored: Mon Feb 19 15:15:55 2018 +0100 Committer: Benjamin Bannier <bbann...@apache.org> Committed: Mon Feb 19 15:15:55 2018 +0100 ---------------------------------------------------------------------- src/tests/master_tests.cpp | 212 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8c9184a0/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 28663c7..3705fa7 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -8743,6 +8743,218 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers) } +// Tests that the master correctly handles resource provider operations +// that finished during a master failover. +TEST_F(MasterTest, OperationUpdateDuringFailover) +{ + Clock::pause(); + + master::Flags masterFlags = CreateMasterFlags(); + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + // TODO(nfnt): Remove this once 'MockResourceProvider' supports + // authentication. + slaveFlags.authenticate_http_readwrite = false; + + // Set the resource provider capability. + vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES(); + SlaveInfo::Capability capability; + capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER); + capabilities.push_back(capability); + + slaveFlags.agent_features = SlaveCapabilities(); + slaveFlags.agent_features->mutable_capabilities()->CopyFrom( + {capabilities.begin(), capabilities.end()}); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + StandaloneMasterDetector detector(master.get()->pid); + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlaveMessage); + + // Register a resource provider with the agent. + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test"); + resourceProviderInfo.set_name("test"); + + v1::Resources resourceProviderResources = v1::createDiskResource( + "200", + "*", + None(), + None(), + v1::createDiskSourceRaw()); + + v1::MockResourceProvider resourceProvider( + resourceProviderInfo, + resourceProviderResources); + + Owned<EndpointDetector> endpointDetector( + resource_provider::createEndpointDetector(slave.get()->pid)); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + resourceProvider.start( + endpointDetector, + ContentType::PROTOBUF, + v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(updateSlaveMessage); + + // Start a framework to operate on offers. + MockScheduler sched; + TestingMesosSchedulerDriver driver(&sched, &detector); + + // Expect a registration as well as a re-registration after master + // failover. + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(2); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + const Offer& offer = offers->front(); + + Option<Resource> rawDisk; + + foreach (const Resource& resource, offer.resources()) { + if (resource.has_provider_id() && + resource.has_disk() && + resource.disk().has_source() && + resource.disk().source().type() == Resource::DiskInfo::Source::RAW) { + rawDisk = resource; + break; + } + } + + ASSERT_SOME(rawDisk); + + Future<mesos::v1::resource_provider::Event::ApplyOperation> operation; + EXPECT_CALL(resourceProvider, applyOperation(_)) + .WillOnce(FutureArg<0>(&operation)); + + driver.acceptOffers( + {offer.id()}, + {CREATE_VOLUME(rawDisk.get(), Resource::DiskInfo::Source::MOUNT)}); + + AWAIT_READY(operation); + + Option<mesos::v1::UUID> operationUUID; + + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_OPERATIONS); + + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(ContentType::PROTOBUF); + + Future<Response> response = process::http::post( + master.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + Try<v1::master::Response> response_ = + deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body); + + ASSERT_SOME(response_); + const v1::master::Response::GetOperations& operations = + response_->get_operations(); + + ASSERT_EQ(1, operations.operations_size()); + EXPECT_EQ( + mesos::v1::OperationState::OPERATION_PENDING, + operations.operations(0).latest_status().state()); + + operationUUID = operations.operations(0).uuid(); + } + + CHECK_SOME(operationUUID); + + EXPECT_CALL(sched, disconnected(&driver)); + + // Drop the operation update for the finished operation. + // As we fail over the master immediately afterwards, we expect + // that the operation update will be part of the agent's + // `UPDATE_STATE` message when re-registering with the master. + Future<UpdateOperationStatusMessage> updateOperationStatusMessage = + DROP_PROTOBUF(UpdateOperationStatusMessage(), _, _); + + // Finish the pending operation. + resourceProvider.operationDefault(operation.get()); + + AWAIT_READY(updateOperationStatusMessage); + + // Fail over the master. + master->reset(); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .WillRepeatedly(Return()); + + // Start a new master and have agent and framework reconnect. + // The reconnected agent should report the converted resources. + master = StartMaster(masterFlags); + detector.appoint(master.get()->pid); + + Clock::advance(slaveFlags.registration_backoff_factor); + Clock::settle(); + + AWAIT_READY(updateSlaveMessage); + + { + v1::master::Call call; + call.set_type(v1::master::Call::GET_OPERATIONS); + + process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); + headers["Accept"] = stringify(ContentType::PROTOBUF); + + Future<Response> response = process::http::post( + master.get()->pid, + "api/v1", + headers, + serialize(ContentType::PROTOBUF, call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + Try<v1::master::Response> response_ = + deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body); + + ASSERT_SOME(response_); + const v1::master::Response::GetOperations& operations = + response_->get_operations(); + + ASSERT_EQ(1, operations.operations_size()); + EXPECT_EQ( + mesos::v1::OperationState::OPERATION_FINISHED, + operations.operations(0).latest_status().state()); + EXPECT_EQ(operationUUID.get(), operations.operations(0).uuid()); + } + + driver.stop(); + driver.join(); +} + + class MasterTestPrePostReservationRefinement : public MasterTest, public WithParamInterface<bool> {