This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8174ccf0983c82e21ca103bc87a5f78d5bff4971 Author: Joseph Wu <[email protected]> AuthorDate: Tue Feb 19 16:41:51 2019 -0800 Added test for terminal operation updates after master failover. This test covers a corner case where an agent reregisters with the master with a pending operation, but the operation's originating framework is unknown. This can occur in a variety of situations like: * the master fails over and a framework never reregisters, * a completed framework is rotated out of the master's memory with pending operations, or * an agent with pending operations is migrated from one cluster to another. In this case, the master should "adopt" the orphan operation only after a delay. This gives the framework some time to reregister. But if the framework does not reregister in time, the master will be in charge of acknowledging operation status updates. Review: https://reviews.apache.org/r/70040 --- .../storage_local_resource_provider_tests.cpp | 191 +++++++++++++++++++++ 1 file changed, 191 insertions(+) diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 6b38c4f..75f0d81 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -5293,6 +5293,197 @@ TEST_F( Clock::resume(); } + +// This test verifies the master will adopt orphan operations reported +// by agents upon reregistration. These orphans can appear whenever +// the master fails over, but the operation's originating framework +// never reregisters. Alternatively, and uncommonly, pending operations +// on an agent migrated from one master to another will produce orphans. +TEST_F( + StorageLocalResourceProviderTest, + TerminalOrphanOperationAfterMasterFailover) +{ + const string profilesPath = path::join(sandbox.get(), "profiles.json"); + + ASSERT_SOME( + os::write(profilesPath, createDiskProfileMapping({{"test", None()}}))); + + loadUriDiskProfileAdaptorModule(profilesPath); + + setupResourceProviderConfig(Gigabytes(4)); + + master::Flags masterFlags = CreateMasterFlags(); + + // Greatly increase the ping timeout of the master/agent connection. + // The clock will be advanced by `MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION` + // and if the ping timeout is less than this amount, the agent will + // disconnect. + masterFlags.agent_ping_timeout = + master::DEFAULT_AGENT_PING_TIMEOUT + + master::MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION; + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + // NOTE: This test needs to use the StandaloneMasterDetector directly + // (instead of the `cluster::Master::createDetector` helper) so that + // the restarted master can be detected by the running agent, when the + // test dictates. + StandaloneMasterDetector detector(master.get()->pid); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Register a framework to exercise an operation. + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, "storage"); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + // NOTE: The scheduler may connect again after the TEARDOWN call. + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)) + .WillRepeatedly(Return()); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + // Decline offers that contain only the agent's default resources. + EXPECT_CALL(*scheduler, offers(_, _)) + .WillRepeatedly(v1::scheduler::DeclineOffers()); + + // We are only interested in an offer with raw disk resources + auto isRaw = [](const v1::Resource& r) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW; + }; + + Future<v1::scheduler::Event::Offers> offers; + + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource(isRaw))) + .WillOnce(FutureArg<1>(&offers)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); + + // Prepare to intercept the terminal operation status message between + // agent and master, to allow the master to failover first. + // This creates a situation where the master forgets about the + // framework that started the operation. + Future<UpdateOperationStatusMessage> updateMessage = + DROP_PROTOBUF( + UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + + ASSERT_FALSE(offer.resources().empty()); + + // The test relies on retry logic within the SLRP module to resend the + // terminal operation feedback message. We pause the clock here to control + // when this retry happens. + Clock::pause(); + + // Have the framework call CREATE_DISK, a non-speculative operation. + Option<v1::Resource> rawDisk; + + foreach (const v1::Resource& resource, offer.resources()) { + if (isRaw(resource)) { + rawDisk = resource; + break; + } + } + + ASSERT_SOME(rawDisk); + + v1::OperationID operationId; + operationId.set_value("operation"); + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::CREATE_DISK( + rawDisk.get(), + v1::Resource::DiskInfo::Source::MOUNT, + None(), + operationId)})); + + // This message will be sent once the agent has completed the operation. + AWAIT_READY(updateMessage); + + EXPECT_EQ( + OperationState::OPERATION_FINISHED, updateMessage->status().state()); + EXPECT_EQ( + operationId.value(), updateMessage->status().operation_id().value()); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + // Trigger a master failover to enter a situation where a terminal operation + // is orphaned, and the originating framework is not "completed". + detector.appoint(None()); + master->reset(); + + // Get rid of the framework so that it does not reregister. + // We want the framework to remain unknown after the master/agent recovers. + scheduler.reset(); + + // Start the master back up and wait for the agent to reregister. + master = StartMaster(masterFlags); + ASSERT_SOME(master); + detector.appoint(master.get()->pid); + + Clock::advance(slaveFlags.registration_backoff_factor); + AWAIT_READY(slaveReregisteredMessage); + + Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage = + FUTURE_PROTOBUF( + AcknowledgeOperationStatusMessage(), + master.get()->pid, + slave.get()->pid); + + // Trigger a retry of the operation status update. The master should drop + // this one because the agent has just reregistered and the framework is + // unknown. + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::settle(); + + ASSERT_TRUE(acknowledgeOperationStatusMessage.isPending()); + + // Trigger a retry after advancing the clock beyond the operation "adoption" + // time, which means the master should acknowledge the terminal status. + Clock::advance(master::MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION); + Clock::settle(); + + AWAIT_READY(acknowledgeOperationStatusMessage); + + Clock::resume(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
