This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 01d4c74f30cf32431c0bd70e5743dcdffab91b0e Author: Joseph Wu <[email protected]> AuthorDate: Wed Jan 30 17:32:31 2019 -0800 Added test for tearing down frameworks while creating disks. The CREATE_DISK and DESTROY_DISK operations are "non-speculative" operations, which means the master must wait for the operations to complete successfully before the master can update its resources. Because the master must wait to update the results of non-speculative operations, it is possible for the framework making the CREATE/DESTROY_DISK to be torn down before the operation completes. This commit adds a test to make sure the master can gracefully handle such a case. Review: https://reviews.apache.org/r/69869 --- .../storage_local_resource_provider_tests.cpp | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index a661951..6b38c4f 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -5123,6 +5123,176 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors"))); } + +// This test verifies the master can handle "non-speculative" +// (i.e. the master cannot assume the success of the operation) operation +// status updates that originate from frameworks which have been torn down. +// Non-speculative operations include CREATE_DISK. +TEST_F( + StorageLocalResourceProviderTest, + FrameworkTeardownBeforeTerminalOperationStatusUpdate) +{ + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + setupResourceProviderConfig(Gigabytes(4)); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise an operation. + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, "storage"); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + // NOTE: The scheduler may connect again after the TEARDOWN call. + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)) + .WillRepeatedly(Return()); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + // Decline offers that contain only the agent's default resources. + EXPECT_CALL(*scheduler, offers(_, _)) + .WillRepeatedly(v1::scheduler::DeclineOffers()); + + // We are only interested in an offer with raw disk resources + auto isRaw = [](const v1::Resource& r) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW; + }; + + Future<v1::scheduler::Event::Offers> offers; + + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource(isRaw))) + .WillOnce(FutureArg<1>(&offers)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + // Prepare to intercept the terminal operation status message between + // agent and master, to allow the framework to teardown first. + // This will cause the operation to become an orphan operation. + Future<UpdateOperationStatusMessage> updateMessage = + DROP_PROTOBUF( + UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + + ASSERT_FALSE(offer.resources().empty()); + + // The test relies on retry logic within the SLRP module to resend the + // terminal operation feedback message. We pause the clock here to control + // when this retry happens. + Clock::pause(); + + // Have the framework call CREATE_DISK, a non-speculative operation. + Option<v1::Resource> rawDisk; + + foreach (const v1::Resource& resource, offer.resources()) { + if (isRaw(resource)) { + rawDisk = resource; + break; + } + } + + ASSERT_SOME(rawDisk); + + v1::OperationID operationId; + operationId.set_value("operation"); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::CREATE_DISK( + rawDisk.get(), + v1::Resource::DiskInfo::Source::MOUNT, + None(), + operationId)})); + + // This message will be sent once the agent has completed the operation. + AWAIT_READY(updateMessage); + + EXPECT_EQ( + OperationState::OPERATION_FINISHED, updateMessage->status().state()); + EXPECT_EQ( + operationId.value(), updateMessage->status().operation_id().value()); + + // We can now tear down the framework, since we are certain the operation + // has reached the agent. + scheduler.reset(); + + { + v1::master::Call v1Call; + v1Call.set_type(v1::master::Call::TEARDOWN); + + v1::master::Call::Teardown* teardown = v1Call.mutable_teardown(); + + teardown->mutable_framework_id()->CopyFrom(frameworkId); + + Future<http::Response> response = process::http::post( + master.get()->pid, + "api/v1", + createBasicAuthHeaders(DEFAULT_CREDENTIAL), + serialize(ContentType::PROTOBUF, v1Call), + stringify(ContentType::PROTOBUF)); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); + } + + // Since the scheduler is gone, the master will acknowledge the operation + // status update on its behalf. + Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage = + FUTURE_PROTOBUF( + AcknowledgeOperationStatusMessage(), + master.get()->pid, + slave.get()->pid); + + // Resend the dropped operation status update. + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::settle(); + + AWAIT_READY(acknowledgeOperationStatusMessage); + + Clock::resume(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
