Tested that agent resends unacknowledged operation updates on recovery. This patch adds a test to verify that the agent resends unacknowledged operation status updates after a recovery.
Review: https://reviews.apache.org/r/65182/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/336e9321 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/336e9321 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/336e9321 Branch: refs/heads/master Commit: 336e932199643e88c0edbea7c1f08d4b45596389 Parents: 60f23d8 Author: Gaston Kleiman <[email protected]> Authored: Fri Jan 19 15:36:32 2018 -0800 Committer: Greg Mann <[email protected]> Committed: Fri Jan 19 15:50:32 2018 -0800 ---------------------------------------------------------------------- .../storage_local_resource_provider_tests.cpp | 190 +++++++++++++++++++ 1 file changed, 190 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/336e9321/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index f6d093a..5ac9180 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -2077,6 +2077,196 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate) driver.join(); } + +// This test verifies that on agent restarts, unacknowledged operation status +// updates are resent to the master +// +// To accomplish this: +// 1. Creates a volume from a RAW disk resource. +// 2. Drops the first `UpdateOperationStatusMessage` from the agent to the +// master, so that it isn't acknowledged by the master. +// 3. Restarts the agent. +// 4. Verifies that the agent resends the operation status update. +TEST_F( + StorageLocalResourceProviderTest, + ROOT_RetryOperationStatusUpdateAfterRecovery) +{ + Clock::pause(); + + loadUriDiskProfileModule(); + + setupResourceProviderConfig(Gigabytes(4)); + setupDiskProfileConfig(); + + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + slave::Flags flags = CreateSlaveFlags(); + flags.isolation = "filesystem/linux"; + + // Disable HTTP authentication to simplify resource provider interactions. + flags.authenticate_http_readwrite = false; + + // Set the resource provider capability. + vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES(); + SlaveInfo::Capability capability; + capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER); + capabilities.push_back(capability); + + flags.agent_features = SlaveCapabilities(); + flags.agent_features->mutable_capabilities()->CopyFrom( + {capabilities.begin(), capabilities.end()}); + + flags.resource_provider_config_dir = resourceProviderConfigDir; + flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME; + + // Since the local resource provider daemon is started after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. + // + // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because + // Google Mock will search the expectations in reverse order. + Future<UpdateSlaveMessage> updateSlave2 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave1 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration. + Clock::advance(flags.registration_backoff_factor); + + AWAIT_READY(updateSlave1); + + // NOTE: We need to resume the clock so that the resource provider can + // periodically check if the CSI endpoint socket has been created by + // the plugin container, which runs in another Linux process. + Clock::resume(); + + AWAIT_READY(updateSlave2); + ASSERT_TRUE(updateSlave2->has_resource_providers()); + ASSERT_EQ(1, updateSlave2->resource_providers().providers_size()); + + Clock::pause(); + + // Register a framework to exercise an operation. + FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + framework.set_roles(0, "storage"); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers; + + auto isRaw = []( + const Resource& r) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().has_profile() && + r.disk().source().type() == Resource::DiskInfo::Source::RAW; + }; + + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isRaw, lambda::_1)))) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + + const Offer& offer = offers->at(0); + + Option<Resource> source; + foreach (const Resource& resource, offer.resources()) { + if (isRaw(resource)) { + source = resource; + break; + } + } + ASSERT_SOME(source); + + // We'll drop the first operation status update from the agent to the master. + Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage = + DROP_PROTOBUF( + UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid); + + // Create a volume. + driver.acceptOffers( + {offer.id()}, + {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)}, + {}); + + AWAIT_READY(droppedUpdateOperationStatusMessage); + + // Restart the agent. + slave.get()->terminate(); + + // Since the local resource provider daemon is started after the agent + // is registered, it is guaranteed that the slave will send two + // `UpdateSlaveMessage`s, where the latter one contains resources from + // the storage local resource provider. + Future<UpdateSlaveMessage> updateSlave4 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Future<UpdateSlaveMessage> updateSlave3 = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + // Once the agent is restarted, the SLRP should resend the dropped operation + // status update. + Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, master.get()->pid); + + // The master should acknowledge the operation status update once. + Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage = + FUTURE_PROTOBUF(AcknowledgeOperationStatusMessage(), master.get()->pid, _); + + // Decline offers without RAW disk resources, the master can send such offers + // once it receives the first `UpdateSlaveMessage` after the agent failover, + // or after receiving the `UpdateOperationStatusMessage`. + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(DeclineOffers()); + + slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration. + Clock::advance(flags.registration_backoff_factor); + + AWAIT_READY(updateSlave3); + + // Resume the clock so that the CSI plugin's standalone container is created + // and the SLRP's async loop notices it. + Clock::resume(); + + AWAIT_READY(updateSlave4); + ASSERT_TRUE(updateSlave4->has_resource_providers()); + ASSERT_EQ(1, updateSlave4->resource_providers().providers_size()); + + Clock::pause(); + + AWAIT_READY(retriedUpdateOperationStatusMessage); + + AWAIT_READY(acknowledgeOperationStatusMessage); + + // The master has acknowledged the operation status update, so the SLRP + // shouldn't send further operation status updates. + EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _); + + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + Clock::settle(); + + driver.stop(); + driver.join(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
