This is an automated email from the ASF dual-hosted git repository. bbannier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 266616aa6fe0027b07af80157bf7293735609d54 Author: Benjamin Bannier <[email protected]> AuthorDate: Wed Aug 14 09:33:37 2019 +0200 Factored out storage provider method to update resources. Review: https://reviews.apache.org/r/71150/ --- src/resource_provider/storage/provider.cpp | 120 ++++++++++----------- .../storage_local_resource_provider_tests.cpp | 11 +- 2 files changed, 64 insertions(+), 67 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 71aa650..2f91fe0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -273,6 +273,10 @@ private: // truth, such as CSI plugin responses or the status update manager. Future<Nothing> reconcileResourceProviderState(); Future<Nothing> reconcileOperationStatuses(); + + // Query the plugin for its resources and update the providers state. + Future<Nothing> reconcileResources(); + ResourceConversion computeConversion( const Resources& checkpointed, const Resources& discovered) const; @@ -295,10 +299,6 @@ private: // set of profiles it knows about. Future<Nothing> updateProfiles(const hashset<string>& profiles); - // Reconcile the storage pools when the set of known profiles changes, - // or a volume with an unknown profile is destroyed. - Future<Nothing> reconcileStoragePools(); - // Returns true if the storage pools are allowed to be reconciled when // the operation is being applied. static bool allowsReconciliation(const Offer::Operation& operation); @@ -716,39 +716,63 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResourceProviderState() { return reconcileOperationStatuses() - .then(defer(self(), [=] { - return collect<vector<ResourceConversion>>( - {getExistingVolumes(), getStoragePools()}) - .then(defer(self(), [=]( - const vector<vector<ResourceConversion>>& collected) { + .then(defer(self(), &Self::reconcileResources)); +} + + +Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources() +{ + LOG(INFO) << "Reconciling storage pools and volumes"; + + return collect<vector<ResourceConversion>>( + {getExistingVolumes(), getStoragePools()}) + .then(defer( + self(), [this](const vector<vector<ResourceConversion>>& collected) { Resources result = totalResources; foreach (const vector<ResourceConversion>& conversions, collected) { result = CHECK_NOTERROR(result.apply(conversions)); } + bool shouldSendUpdate = false; + if (result != totalResources) { - LOG(INFO) - << "Removing '" << (totalResources - result) << "' and adding '" - << (result - totalResources) << "' to the total resources"; + LOG(INFO) << "Removing '" << (totalResources - result) + << "' and adding '" << (result - totalResources) + << "' to the total resources"; + // Update the resource version since the total resources changed. totalResources = result; + resourceVersion = id::UUID::random(); + checkpointResourceProviderState(); + + shouldSendUpdate = true; } - // NOTE: Since this is the first `UPDATE_STATE` call of the - // current subscription, there must be no racing speculative - // operation, thus no need to update the resource version. - sendResourceProviderStateUpdate(); - statusUpdateManager.resume(); + switch (state) { + case RECOVERING: + case DISCONNECTED: + case CONNECTED: + case SUBSCRIBED: { + LOG(INFO) << "Resource provider " << info.id() + << " is in READY state"; - LOG(INFO) - << "Resource provider " << info.id() << " is in READY state"; + state = READY; + + // This is the first resource update of the current subscription. + shouldSendUpdate = true; + } + case READY: + break; + } - state = READY; + if (shouldSendUpdate) { + sendResourceProviderStateUpdate(); + statusUpdateManager.resume(); + } return Nothing(); })); - })); } @@ -913,44 +937,6 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses() } -Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools() -{ - CHECK_PENDING(reconciled); - - auto die = [=](const string& message) { - LOG(ERROR) - << "Failed to reconcile storage pools for resource provider " << info.id() - << ": " << message; - fatal(); - }; - - return getStoragePools() - .then(defer(self(), [=](const vector<ResourceConversion>& conversions) { - Resources result = CHECK_NOTERROR(totalResources.apply(conversions)); - - if (result != totalResources) { - LOG(INFO) - << "Removing '" << (totalResources - result) << "' and adding '" - << (result - totalResources) << "' to the total resources"; - - totalResources = result; - checkpointResourceProviderState(); - - // NOTE: We always update the resource version before sending - // an `UPDATE_STATE`, so that any racing speculative operation - // will be rejected. Otherwise, the speculative resource - // conversion done on the master will be cancelled out. - resourceVersion = id::UUID::random(); - sendResourceProviderStateUpdate(); - } - - return Nothing(); - })) - .onFailed(defer(self(), std::bind(die, lambda::_1))) - .onDiscarded(defer(self(), std::bind(die, "future discarded"))); -} - - bool StorageLocalResourceProviderProcess::allowsReconciliation( const Offer::Operation& operation) { @@ -1182,7 +1168,7 @@ void StorageLocalResourceProviderProcess::watchProfiles() std::function<Future<Nothing>()> update = defer(self(), [=] { return updateProfiles(profiles) - .then(defer(self(), &Self::reconcileStoragePools)); + .then(defer(self(), &Self::reconcileResources)); }); // Update the profile mapping and storage pools in `sequence` to wait @@ -1871,8 +1857,18 @@ StorageLocalResourceProviderProcess::applyDestroyDisk( // pending operation that disallow reconciliation to finish, and set // up `reconciled` to drop incoming operations that disallow // reconciliation until the storage pools are reconciled. - reconciled = sequence.add(std::function<Future<Nothing>()>( - defer(self(), &Self::reconcileStoragePools))); + auto err = [](const Resource& resource, const string& message) { + LOG(ERROR) + << "Failed to reconcile storage pools after resource " + << "'" << resource << "' has been freed: " << message; + }; + + reconciled = + sequence + .add(std::function<Future<Nothing>()>( + defer(self(), &Self::reconcileResources))) + .onFailed(std::bind(err, resource, lambda::_1)) + .onDiscard(std::bind(err, resource, "future discarded")); } } } else { diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index f27f5c1..7624ea1 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -5129,12 +5129,13 @@ TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics) ASSERT_SOME(source); // We expect that the following RPC calls are made during startup: `Probe`, - // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`, - // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`. + // `GetPluginInfo` (2), `GetCapacity`, `GetPluginCapabilities, + // `ControllerGetCapabilities`, `ListVolumes` (2), `NodeGetCapabilities`, + // `NodeGetId`. // // TODO(chhsiao): As these are implementation details, we should count the // calls processed by a mock CSI plugin and check the metrics against that. - const int numFinishedStartupRpcs = 9; + const int numFinishedStartupRpcs = 10; EXPECT_TRUE(metricEquals( metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs)); @@ -5870,11 +5871,11 @@ TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) // We expect that the following RPC calls are made during startup: `Probe`, // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`, - // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`. + // `ListVolumes` (2), `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`. // // TODO(chhsiao): As these are implementation details, we should count the // calls processed by a mock CSI plugin and check the metrics against that. - const int numFinishedStartupRpcs = 9; + const int numFinishedStartupRpcs = 10; EXPECT_TRUE(metricEquals( metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));
