Tested that operation updates dropped en route to master are resent. This patch adds `StorageLocalResourceProviderTest.ROOT_RetryOperationStatusUpdate` which verifies that operation status updates are resent by the agent after being dropped en route to the master.
Review: https://reviews.apache.org/r/65057/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60f23d87 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60f23d87 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60f23d87 Branch: refs/heads/master Commit: 60f23d870080c5d70963857cb06a50cf0d2825fb Parents: 434ef5f Author: Gaston Kleiman <[email protected]> Authored: Fri Jan 19 15:36:31 2018 -0800 Committer: Greg Mann <[email protected]> Committed: Fri Jan 19 15:50:32 2018 -0800 ---------------------------------------------------------------------- .../storage_local_resource_provider_tests.cpp | 161 +++++++++++++++++++ 1 file changed, 161 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/60f23d87/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 1b21527..f6d093a 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -1916,6 +1916,167 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume) } } + +// This test verifies that operation status updates are resent to the master +// after being dropped en route to it. +// +// To accomplish this: +// 1. Creates a volume from a RAW disk resource. +// 2. Drops the first `UpdateOperationStatusMessage` from the agent to the +// master, so that it isn't acknowledged by the master. +// 3. Advances the clock and verifies that the agent resends the operation +// status update. +TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate) +{ + Clock::pause(); + + loadUriDiskProfileModule(); + + setupResourceProviderConfig(Gigabytes(4)); + setupDiskProfileConfig(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags flags = CreateSlaveFlags(); + flags.isolation = "filesystem/linux"; + + // Disable HTTP authentication to simplify resource provider interactions. + flags.authenticate_http_readwrite = false; + + // Set the resource provider capability. + vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES(); + SlaveInfo::Capability capability; + capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER); + capabilities.push_back(capability); + + flags.agent_features = SlaveCapabilities(); + flags.agent_features->mutable_capabilities()->CopyFrom( + {capabilities.begin(), capabilities.end()}); + + flags.resource_provider_config_dir = resourceProviderConfigDir; + flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + // Since the local resource provider daemon is started after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlave2 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave1 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration. + Clock::advance(flags.registration_backoff_factor); + + AWAIT_READY(updateSlave1); + + // NOTE: We need to resume the clock so that the resource provider can + // periodically check if the CSI endpoint socket has been created by + // the plugin container, which runs in another Linux process. + Clock::resume(); + + AWAIT_READY(updateSlave2); + ASSERT_TRUE(updateSlave2->has_resource_providers()); + ASSERT_EQ(1, updateSlave2->resource_providers().providers_size()); + + Clock::pause(); + + // Register a framework to exercise an operation. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + + auto isRaw = []( + const Resource& r) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW; + }; + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isRaw, lambda::_1)))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + const Offer& offer = offers->at(0); + + Option<Resource> source; + foreach (const Resource& resource, offer.resources()) { + if (isRaw(resource)) { + source = resource; + break; + } + } + ASSERT_SOME(source); + + // We'll drop the first operation status update from the agent to the master. + Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage = + DROP_PROTOBUF( + UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid); + + // Create a volume. + driver.acceptOffers( + {offer.id()}, + {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)}, + {}); + + AWAIT_READY(droppedUpdateOperationStatusMessage); + + // The SLRP should resend the dropped operation status update after the + // status update retry interval minimum. + Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage = + FUTURE_PROTOBUF( + UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid); + + // The master should acknowledge the operation status update. + Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage = + FUTURE_PROTOBUF( + AcknowledgeOperationStatusMessage(), master.get()->pid, slave.get()->pid); + + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + + AWAIT_READY(retriedUpdateOperationStatusMessage); + AWAIT_READY(acknowledgeOperationStatusMessage); + + // The master acknowledged the operation status update, so the SLRP shouldn't + // send further operation status updates. + EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _); + + // The master received the `UpdateOperationStatusMessage`, so it can now + // offer the `MOUNT` disk - no further offers are needed, so they can be + // declined. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers()); + + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::settle(); + + driver.stop(); + driver.join(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
