Clear all callbacks when a future is completed. Review: https://reviews.apache.org/r/30348
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97e029a4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97e029a4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97e029a4 Branch: refs/heads/master Commit: 97e029a434c40dcef21be02aaf7b8b1678ac7544 Parents: 977b80a Author: Vinod Kone <[email protected]> Authored: Tue Jan 27 17:09:23 2015 -0800 Committer: Vinod Kone <[email protected]> Committed: Wed Jan 28 14:25:39 2015 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/future.hpp | 50 +++++++++++++++++---- 1 file changed, 42 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/97e029a4/3rdparty/libprocess/include/process/future.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index 0326b23..63c79ee 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -541,6 +541,9 @@ private: Data(); ~Data(); + // Clears all callbacks. + void clearAllCallbacks(); + int lock; State state; bool discard; @@ -943,11 +946,14 @@ bool Promise<T>::discard(Future<T> future) // DISCARDED. We don't need a lock because the state is now in // DISCARDED so there should not be any concurrent modifications. if (result) { + // Run onDiscarded callbacks. internal::run(future.data->onDiscardedCallbacks); - future.data->onDiscardedCallbacks.clear(); + // Run onAny callbacks. internal::run(future.data->onAnyCallbacks, future); - future.data->onAnyCallbacks.clear(); + + // Clear all callbacks since the future is complete. + future.data->clearAllCallbacks(); } return result; @@ -982,6 +988,17 @@ Future<T>::Data::~Data() template <typename T> +void Future<T>::Data::clearAllCallbacks() +{ + onAnyCallbacks.clear(); + onDiscardCallbacks.clear(); + onDiscardedCallbacks.clear(); + onFailedCallbacks.clear(); + onReadyCallbacks.clear(); +} + + +template <typename T> Future<T>::Future() : data(new Data()) {} @@ -1068,10 +1085,22 @@ bool Future<T>::discard() { bool result = false; + std::vector<DiscardCallback> callbacks; internal::acquire(&data->lock); { if (!data->discard && data->state == PENDING) { result = data->discard = true; + + // NOTE: We make a copy of the onDiscard callbacks here + // because it is possible that another thread completes this + // future (ready, failed or discarded) when the current thread + // is out of this critical section but *before* it executed the + // onDiscard callbacks. If that happens, the other thread might + // be clearing the onDiscard callbacks (via clearAllCallbacks()) + // while the current thread is executing or clearing the + // onDiscard callbacks, causing thread safety issue. + callbacks = data->onDiscardCallbacks; + data->onDiscardCallbacks.clear(); } } internal::release(&data->lock); @@ -1081,8 +1110,7 @@ bool Future<T>::discard() // be set so we won't be adding anything else to // 'Data::onDiscardCallbacks'. if (result) { - internal::run(data->onDiscardCallbacks); - data->onDiscardCallbacks.clear(); + internal::run(callbacks); } return result; @@ -1608,11 +1636,14 @@ bool Future<T>::set(const T& _t) // don't need a lock because the state is now in READY so there // should not be any concurrent modications. if (result) { + // Run onReady callbacks. internal::run(data->onReadyCallbacks, *data->t); - data->onReadyCallbacks.clear(); + // Run onAny callbacks. internal::run(data->onAnyCallbacks, *this); - data->onAnyCallbacks.clear(); + + // Clear all callbacks since the future is complete. + data->clearAllCallbacks(); } return result; @@ -1638,11 +1669,14 @@ bool Future<T>::fail(const std::string& _message) // don't need a lock because the state is now in FAILED so there // should not be any concurrent modications. if (result) { + // Run onFailed callbacks. internal::run(data->onFailedCallbacks, *data->message); - data->onFailedCallbacks.clear(); + // Run onAny callbacks. internal::run(data->onAnyCallbacks, *this); - data->onAnyCallbacks.clear(); + + // Clear all callbacks since the future is complete. + data->clearAllCallbacks(); } return result;
