Updated Branches:
  refs/heads/master 280895fbb -> ec8a16962

Added a WeakFuture<T>.

Using a WeakFuture<T> fixes memory leaks in Future::then and
Promise::associate.

From: Jie Yu <[email protected]>
Review: https://reviews.apache.org/r/16197


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec8a1696
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec8a1696
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec8a1696

Branch: refs/heads/master
Commit: ec8a16962a401c15cd28f1247f0777f1ea76fc6c
Parents: 280895f
Author: Benjamin Hindman <[email protected]>
Authored: Thu Dec 12 15:01:03 2013 -0800
Committer: Benjamin Hindman <[email protected]>
Committed: Thu Dec 12 15:01:04 2013 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 117 ++++++++++++++------
 1 file changed, 82 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ec8a1696/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp 
b/3rdparty/libprocess/include/process/future.hpp
index e473b3d..4122b96 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -19,6 +19,7 @@
 
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
+#include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/preprocessor.hpp>
 
@@ -43,6 +44,11 @@ template <typename T>
 class Promise;
 
 
+// Forward declaration of WeakFuture.
+template <typename T>
+class WeakFuture;
+
+
 // Definition of a "shared" future. A future can hold any
 // copy-constructible value. A future is considered "shared" because
 // by default a future can be accessed concurrently.
@@ -215,17 +221,7 @@ public:
 
 private:
   friend class Promise<T>;
-
-  // Sets the value for this future, unless the future is already set,
-  // failed, or discarded, in which case it returns false.
-  bool set(const T& _t);
-
-  // Sets this future as failed, unless the future is already set,
-  // failed, or discarded, in which case it returns false.
-  bool fail(const std::string& _message);
-
-  void copy(const Future<T>& that);
-  void cleanup();
+  friend class WeakFuture<T>;
 
   enum State
   {
@@ -251,10 +247,70 @@ private:
     std::queue<AnyCallback> onAnyCallbacks;
   };
 
+  // Sets the value for this future, unless the future is already set,
+  // failed, or discarded, in which case it returns false.
+  bool set(const T& _t);
+
+  // Sets this future as failed, unless the future is already set,
+  // failed, or discarded, in which case it returns false.
+  bool fail(const std::string& _message);
+
   std::tr1::shared_ptr<Data> data;
 };
 
 
+// Represents a weak reference to a future. This class is used to
+// break cyclic dependencies between futures.
+template <typename T>
+class WeakFuture
+{
+public:
+  WeakFuture(const Future<T>& future);
+
+  // Converts this weak reference to a concrete future. Returns none
+  // if the conversion is not successful.
+  Option<Future<T> > get();
+
+private:
+  std::tr1::weak_ptr<typename Future<T>::Data> data;
+};
+
+
+template <typename T>
+WeakFuture<T>::WeakFuture(const Future<T>& future)
+  : data(future.data) {}
+
+
+template <typename T>
+Option<Future<T> > WeakFuture<T>::get()
+{
+  Future<T> future;
+  future.data = data.lock();
+
+  if (future.data) {
+    return future;
+  } else {
+    return None();
+  }
+}
+
+
+namespace internal {
+
+// Discards a weak future. If the weak future is invalid (i.e., the
+// future it references to has already been destroyed), this operation
+// is treated as a no-op.
+template <typename T>
+void discard(WeakFuture<T> reference)
+{
+  Option<Future<T> > future = reference.get();
+  if (future.isSome()) {
+    future.get().discard();
+  }
+}
+
+} // namespace internal {
+
 // Helper for creating failed futures.
 struct _Failure
 {
@@ -351,8 +407,9 @@ bool Promise<T>::associate(const Future<T>& future)
   // future gets discarded, the other future will also get discarded.
   // For 'set' and 'fail', they are associated only in one direction.
   // In other words, calling 'set' or 'fail' on this promise will not
-  // affect the result of the future that we associated.
-  f.onDiscarded(std::tr1::bind(&Future<T>::discard, future));
+  // affect the result of the future that we associated. To avoid
+  // cyclic dependencies, we keep a weak future in the callback.
+  f.onDiscarded(std::tr1::bind(&internal::discard<T>, WeakFuture<T>(future)));
 
   if (!f.isPending()) {
     return false;
@@ -866,15 +923,10 @@ Future<X> Future<T>::then(const 
std::tr1::function<Future<X>(const T&)>& f) cons
 
   onAny(thenf);
 
-  // Propagate discarding up the chain (note that we bind with a copy
-  // of this future since 'this' might no longer be valid but other
-  // references might still exist.
-  // TODO(benh): Need to pass 'future' as a weak_ptr so that we can
-  // avoid reference counting cycles!
-  std::tr1::function<void(void)> discard =
-    std::tr1::bind(&Future<T>::discard, *this);
-
-  promise->future().onDiscarded(discard);
+  // Propagate discarding up the chain. To avoid cyclic dependencies,
+  // we keep a weak future in the callback.
+  promise->future().onDiscarded(
+      std::tr1::bind(&internal::discard<T>, WeakFuture<T>(*this)));
 
   return promise->future();
 }
@@ -894,15 +946,10 @@ Future<X> Future<T>::then(const 
std::tr1::function<X(const T&)>& f) const
 
   onAny(then);
 
-  // Propagate discarding up the chain (note that we bind with a copy
-  // of this future since 'this' might no longer be valid but other
-  // references might still exist.
-  // TODO(benh): Need to pass 'future' as a weak_ptr so that we can
-  // avoid reference counting cycles!
-  std::tr1::function<void(void)> discard =
-    std::tr1::bind(&Future<T>::discard, *this);
-
-  promise->future().onDiscarded(discard);
+  // Propagate discarding up the chain. To avoid cyclic dependencies,
+  // we keep a weak future in the callback.
+  promise->future().onDiscarded(
+      std::tr1::bind(&internal::discard<T>, WeakFuture<T>(*this)));
 
   return promise->future();
 }
@@ -928,12 +975,12 @@ auto Future<T>::then(F f) const
     }
   });
 
-  // TODO(benh): Need to use weak_ptr here so that we can avoid
-  // reference counting cycles!
-  Future<T> future(*this);
+  // Propagate discarding up the chain. To avoid cyclic dependencies,
+  // we keep a weak future in the callback.
+  WeakFuture<T> reference(*this);
 
   promise->future().onDiscarded([=] () {
-    future.discard(); // Need a non-const copy to discard.
+    internal::discard(reference);
   });
 
   return promise->future();

Reply via email to