Updated Branches: refs/heads/master 280895fbb -> ec8a16962
Added a WeakFuture<T>. Using a WeakFuture<T> fixes memory leaks in Future::then and Promise::associate. From: Jie Yu <[email protected]> Review: https://reviews.apache.org/r/16197 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec8a1696 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec8a1696 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec8a1696 Branch: refs/heads/master Commit: ec8a16962a401c15cd28f1247f0777f1ea76fc6c Parents: 280895f Author: Benjamin Hindman <[email protected]> Authored: Thu Dec 12 15:01:03 2013 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Thu Dec 12 15:01:04 2013 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/future.hpp | 117 ++++++++++++++------ 1 file changed, 82 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ec8a1696/3rdparty/libprocess/include/process/future.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index e473b3d..4122b96 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -19,6 +19,7 @@ #include <stout/duration.hpp> #include <stout/error.hpp> +#include <stout/none.hpp> #include <stout/option.hpp> #include <stout/preprocessor.hpp> @@ -43,6 +44,11 @@ template <typename T> class Promise; +// Forward declaration of WeakFuture. +template <typename T> +class WeakFuture; + + // Definition of a "shared" future. A future can hold any // copy-constructible value. A future is considered "shared" because // by default a future can be accessed concurrently. @@ -215,17 +221,7 @@ public: private: friend class Promise<T>; - - // Sets the value for this future, unless the future is already set, - // failed, or discarded, in which case it returns false. - bool set(const T& _t); - - // Sets this future as failed, unless the future is already set, - // failed, or discarded, in which case it returns false. - bool fail(const std::string& _message); - - void copy(const Future<T>& that); - void cleanup(); + friend class WeakFuture<T>; enum State { @@ -251,10 +247,70 @@ private: std::queue<AnyCallback> onAnyCallbacks; }; + // Sets the value for this future, unless the future is already set, + // failed, or discarded, in which case it returns false. + bool set(const T& _t); + + // Sets this future as failed, unless the future is already set, + // failed, or discarded, in which case it returns false. + bool fail(const std::string& _message); + std::tr1::shared_ptr<Data> data; }; +// Represents a weak reference to a future. This class is used to +// break cyclic dependencies between futures. +template <typename T> +class WeakFuture +{ +public: + WeakFuture(const Future<T>& future); + + // Converts this weak reference to a concrete future. Returns none + // if the conversion is not successful. + Option<Future<T> > get(); + +private: + std::tr1::weak_ptr<typename Future<T>::Data> data; +}; + + +template <typename T> +WeakFuture<T>::WeakFuture(const Future<T>& future) + : data(future.data) {} + + +template <typename T> +Option<Future<T> > WeakFuture<T>::get() +{ + Future<T> future; + future.data = data.lock(); + + if (future.data) { + return future; + } else { + return None(); + } +} + + +namespace internal { + +// Discards a weak future. If the weak future is invalid (i.e., the +// future it references to has already been destroyed), this operation +// is treated as a no-op. +template <typename T> +void discard(WeakFuture<T> reference) +{ + Option<Future<T> > future = reference.get(); + if (future.isSome()) { + future.get().discard(); + } +} + +} // namespace internal { + // Helper for creating failed futures. struct _Failure { @@ -351,8 +407,9 @@ bool Promise<T>::associate(const Future<T>& future) // future gets discarded, the other future will also get discarded. // For 'set' and 'fail', they are associated only in one direction. // In other words, calling 'set' or 'fail' on this promise will not - // affect the result of the future that we associated. - f.onDiscarded(std::tr1::bind(&Future<T>::discard, future)); + // affect the result of the future that we associated. To avoid + // cyclic dependencies, we keep a weak future in the callback. + f.onDiscarded(std::tr1::bind(&internal::discard<T>, WeakFuture<T>(future))); if (!f.isPending()) { return false; @@ -866,15 +923,10 @@ Future<X> Future<T>::then(const std::tr1::function<Future<X>(const T&)>& f) cons onAny(thenf); - // Propagate discarding up the chain (note that we bind with a copy - // of this future since 'this' might no longer be valid but other - // references might still exist. - // TODO(benh): Need to pass 'future' as a weak_ptr so that we can - // avoid reference counting cycles! - std::tr1::function<void(void)> discard = - std::tr1::bind(&Future<T>::discard, *this); - - promise->future().onDiscarded(discard); + // Propagate discarding up the chain. To avoid cyclic dependencies, + // we keep a weak future in the callback. + promise->future().onDiscarded( + std::tr1::bind(&internal::discard<T>, WeakFuture<T>(*this))); return promise->future(); } @@ -894,15 +946,10 @@ Future<X> Future<T>::then(const std::tr1::function<X(const T&)>& f) const onAny(then); - // Propagate discarding up the chain (note that we bind with a copy - // of this future since 'this' might no longer be valid but other - // references might still exist. - // TODO(benh): Need to pass 'future' as a weak_ptr so that we can - // avoid reference counting cycles! - std::tr1::function<void(void)> discard = - std::tr1::bind(&Future<T>::discard, *this); - - promise->future().onDiscarded(discard); + // Propagate discarding up the chain. To avoid cyclic dependencies, + // we keep a weak future in the callback. + promise->future().onDiscarded( + std::tr1::bind(&internal::discard<T>, WeakFuture<T>(*this))); return promise->future(); } @@ -928,12 +975,12 @@ auto Future<T>::then(F f) const } }); - // TODO(benh): Need to use weak_ptr here so that we can avoid - // reference counting cycles! - Future<T> future(*this); + // Propagate discarding up the chain. To avoid cyclic dependencies, + // we keep a weak future in the callback. + WeakFuture<T> reference(*this); promise->future().onDiscarded([=] () { - future.discard(); // Need a non-const copy to discard. + internal::discard(reference); }); return promise->future();
