This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.8.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1188b131a09e086f3a1510f16fe3053f0ae3a46a Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Thu May 30 15:13:44 2019 -0700 Fixed chaining futures infinitely in `UriDiskProfileAdaptor`. Previously it is possible to have an infinite chain of futures when `UriDiskProfileAdaptor::watch` is called: if the set of profiles remains fixed for every poll, each poll would satisfy a promise that triggers an asynchronous recursive call to `UriDiskProfileAdaptor::watch` again. This patch fixes the problem by removing the asynchronous recursion. Instead, we maintain a separated promise for each watcher that is never associated to another promise. After each poll, we check if the current set of profiles differs from the known set for a watcher, and satisfy its own promise if so. Review: https://reviews.apache.org/r/70766 --- .../storage/uri_disk_profile_adaptor.cpp | 55 +++++++++++++++------- .../storage/uri_disk_profile_adaptor.hpp | 22 ++++++--- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp index 215f7f9..40eae0c 100644 --- a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp +++ b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp @@ -16,6 +16,7 @@ #include "resource_provider/storage/uri_disk_profile_adaptor.hpp" +#include <algorithm> #include <map> #include <string> #include <tuple> @@ -100,8 +101,7 @@ Future<hashset<string>> UriDiskProfileAdaptor::watch( UriDiskProfileAdaptorProcess::UriDiskProfileAdaptorProcess( const UriDiskProfileAdaptor::Flags& _flags) : ProcessBase(ID::generate("uri-disk-profile-adaptor")), - flags(_flags), - watchPromise(new Promise<Nothing>()) {} + flags(_flags) {} void UriDiskProfileAdaptorProcess::initialize() @@ -140,24 +140,24 @@ Future<hashset<string>> UriDiskProfileAdaptorProcess::watch( const hashset<string>& knownProfiles, const ResourceProviderInfo& resourceProviderInfo) { - // Calculate the new set of profiles for the resource provider. - hashset<string> newProfiles; - foreachpair (const string& profile, - const ProfileRecord& record, - profileMatrix) { + // Calculate the current set of profiles for the resource provider. + hashset<string> currentProfiles; + foreachpair ( + const string& profile, const ProfileRecord& record, profileMatrix) { if (record.active && isSelectedResourceProvider(record.manifest, resourceProviderInfo)) { - newProfiles.insert(profile); + currentProfiles.insert(profile); } } - if (newProfiles != knownProfiles) { - return newProfiles; + if (currentProfiles != knownProfiles) { + return currentProfiles; } // Wait for the next update if there is no change. - return watchPromise->future() - .then(defer(self(), &Self::watch, knownProfiles, resourceProviderInfo)); + watchers.emplace_back(knownProfiles, resourceProviderInfo); + + return watchers.back().promise.future(); } @@ -272,12 +272,35 @@ void UriDiskProfileAdaptorProcess::notify( profileMatrix.put(entry.first, {entry.second, true}); } - // Notify any watchers and then prepare a new promise for the next - // iteration of polling. + // Notify a watcher if its current set of profiles differs from its known set. // // TODO(josephw): Delay this based on the `--max_random_wait` option. - watchPromise->set(Nothing()); - watchPromise.reset(new Promise<Nothing>()); + foreach (WatcherData& watcher, watchers) { + hashset<string> current; + foreachpair ( + const string& profile, const ProfileRecord& record, profileMatrix) { + if (record.active && + isSelectedResourceProvider(record.manifest, watcher.info)) { + current.insert(profile); + } + } + + if (current != watcher.known) { + CHECK(watcher.promise.set(current)) + << "Promise for watcher '" << watcher.info << "' is already " + << watcher.promise.future(); + } + } + + // Remove all notified watchers. + watchers.erase( + std::remove_if( + watchers.begin(), + watchers.end(), + [](const WatcherData& watcher) { + return watcher.promise.future().isReady(); + }), + watchers.end()); LOG(INFO) << "Updated disk profile mapping to " << parsed.profile_matrix().size() diff --git a/src/resource_provider/storage/uri_disk_profile_adaptor.hpp b/src/resource_provider/storage/uri_disk_profile_adaptor.hpp index a5a34dc..027ceaa 100644 --- a/src/resource_provider/storage/uri_disk_profile_adaptor.hpp +++ b/src/resource_provider/storage/uri_disk_profile_adaptor.hpp @@ -17,6 +17,7 @@ #ifndef __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__ #define __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__ +#include <list> #include <string> #include <tuple> @@ -224,10 +225,9 @@ public: private: // Helper that is called upon successfully polling and parsing the `--uri`. - // This method will check the following conditions before updating the state - // of the module: - // * All known profiles must be included in the updated set. - // * All properties of known profiles must match those in the updated set. + // This method will validate that the capability and parameters of a known + // profile must remain the same. Then, any watcher will be notified if its set + // of profiles has been changed. void notify(const resource_provider::DiskProfileMapping& parsed); UriDiskProfileAdaptor::Flags flags; @@ -247,8 +247,18 @@ private: // TODO(josephw): Consider persisting this mapping across agent restarts. hashmap<std::string, ProfileRecord> profileMatrix; - // Will be satisfied whenever `profileMatrix` is changed. - process::Owned<process::Promise<Nothing>> watchPromise; + struct WatcherData + { + WatcherData( + const hashset<std::string>& _known, const ResourceProviderInfo& _info) + : known(_known), info(_info) {} + + hashset<std::string> known; + ResourceProviderInfo info; + process::Promise<hashset<std::string>> promise; + }; + + std::vector<WatcherData> watchers; }; } // namespace storage {
