This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1188b131a09e086f3a1510f16fe3053f0ae3a46a
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Thu May 30 15:13:44 2019 -0700

    Fixed chaining futures infinitely in `UriDiskProfileAdaptor`.
    
    Previously it is possible to have an infinite chain of futures when
    `UriDiskProfileAdaptor::watch` is called: if the set of profiles remains
    fixed for every poll, each poll would satisfy a promise that triggers
    an asynchronous recursive call to `UriDiskProfileAdaptor::watch` again.
    
    This patch fixes the problem by removing the asynchronous recursion.
    Instead, we maintain a separated promise for each watcher that is never
    associated to another promise. After each poll, we check if the current
    set of profiles differs from the known set for a watcher, and satisfy
    its own promise if so.
    
    Review: https://reviews.apache.org/r/70766
---
 .../storage/uri_disk_profile_adaptor.cpp           | 55 +++++++++++++++-------
 .../storage/uri_disk_profile_adaptor.hpp           | 22 ++++++---
 2 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp 
b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
index 215f7f9..40eae0c 100644
--- a/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
+++ b/src/resource_provider/storage/uri_disk_profile_adaptor.cpp
@@ -16,6 +16,7 @@
 
 #include "resource_provider/storage/uri_disk_profile_adaptor.hpp"
 
+#include <algorithm>
 #include <map>
 #include <string>
 #include <tuple>
@@ -100,8 +101,7 @@ Future<hashset<string>> UriDiskProfileAdaptor::watch(
 UriDiskProfileAdaptorProcess::UriDiskProfileAdaptorProcess(
     const UriDiskProfileAdaptor::Flags& _flags)
   : ProcessBase(ID::generate("uri-disk-profile-adaptor")),
-    flags(_flags),
-    watchPromise(new Promise<Nothing>()) {}
+    flags(_flags) {}
 
 
 void UriDiskProfileAdaptorProcess::initialize()
@@ -140,24 +140,24 @@ Future<hashset<string>> 
UriDiskProfileAdaptorProcess::watch(
     const hashset<string>& knownProfiles,
     const ResourceProviderInfo& resourceProviderInfo)
 {
-  // Calculate the new set of profiles for the resource provider.
-  hashset<string> newProfiles;
-  foreachpair (const string& profile,
-               const ProfileRecord& record,
-               profileMatrix) {
+  // Calculate the current set of profiles for the resource provider.
+  hashset<string> currentProfiles;
+  foreachpair (
+      const string& profile, const ProfileRecord& record, profileMatrix) {
     if (record.active &&
         isSelectedResourceProvider(record.manifest, resourceProviderInfo)) {
-      newProfiles.insert(profile);
+      currentProfiles.insert(profile);
     }
   }
 
-  if (newProfiles != knownProfiles) {
-    return newProfiles;
+  if (currentProfiles != knownProfiles) {
+    return currentProfiles;
   }
 
   // Wait for the next update if there is no change.
-  return watchPromise->future()
-    .then(defer(self(), &Self::watch, knownProfiles, resourceProviderInfo));
+  watchers.emplace_back(knownProfiles, resourceProviderInfo);
+
+  return watchers.back().promise.future();
 }
 
 
@@ -272,12 +272,35 @@ void UriDiskProfileAdaptorProcess::notify(
     profileMatrix.put(entry.first, {entry.second, true});
   }
 
-  // Notify any watchers and then prepare a new promise for the next
-  // iteration of polling.
+  // Notify a watcher if its current set of profiles differs from its known 
set.
   //
   // TODO(josephw): Delay this based on the `--max_random_wait` option.
-  watchPromise->set(Nothing());
-  watchPromise.reset(new Promise<Nothing>());
+  foreach (WatcherData& watcher, watchers) {
+    hashset<string> current;
+    foreachpair (
+        const string& profile, const ProfileRecord& record, profileMatrix) {
+      if (record.active &&
+          isSelectedResourceProvider(record.manifest, watcher.info)) {
+        current.insert(profile);
+      }
+    }
+
+    if (current != watcher.known) {
+      CHECK(watcher.promise.set(current))
+        << "Promise for watcher '" << watcher.info << "' is already "
+        << watcher.promise.future();
+    }
+  }
+
+  // Remove all notified watchers.
+  watchers.erase(
+      std::remove_if(
+          watchers.begin(),
+          watchers.end(),
+          [](const WatcherData& watcher) {
+            return watcher.promise.future().isReady();
+          }),
+      watchers.end());
 
   LOG(INFO)
     << "Updated disk profile mapping to " << parsed.profile_matrix().size()
diff --git a/src/resource_provider/storage/uri_disk_profile_adaptor.hpp 
b/src/resource_provider/storage/uri_disk_profile_adaptor.hpp
index a5a34dc..027ceaa 100644
--- a/src/resource_provider/storage/uri_disk_profile_adaptor.hpp
+++ b/src/resource_provider/storage/uri_disk_profile_adaptor.hpp
@@ -17,6 +17,7 @@
 #ifndef __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
 #define __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
 
+#include <list>
 #include <string>
 #include <tuple>
 
@@ -224,10 +225,9 @@ public:
 
 private:
   // Helper that is called upon successfully polling and parsing the `--uri`.
-  // This method will check the following conditions before updating the state
-  // of the module:
-  //   * All known profiles must be included in the updated set.
-  //   * All properties of known profiles must match those in the updated set.
+  // This method will validate that the capability and parameters of a known
+  // profile must remain the same. Then, any watcher will be notified if its 
set
+  // of profiles has been changed.
   void notify(const resource_provider::DiskProfileMapping& parsed);
 
   UriDiskProfileAdaptor::Flags flags;
@@ -247,8 +247,18 @@ private:
   // TODO(josephw): Consider persisting this mapping across agent restarts.
   hashmap<std::string, ProfileRecord> profileMatrix;
 
-  // Will be satisfied whenever `profileMatrix` is changed.
-  process::Owned<process::Promise<Nothing>> watchPromise;
+  struct WatcherData
+  {
+    WatcherData(
+        const hashset<std::string>& _known, const ResourceProviderInfo& _info)
+      : known(_known), info(_info) {}
+
+    hashset<std::string> known;
+    ResourceProviderInfo info;
+    process::Promise<hashset<std::string>> promise;
+  };
+
+  std::vector<WatcherData> watchers;
 };
 
 } // namespace storage {

Reply via email to