Clear all callbacks when a future is completed.

Review: https://reviews.apache.org/r/30348


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97e029a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97e029a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97e029a4

Branch: refs/heads/master
Commit: 97e029a434c40dcef21be02aaf7b8b1678ac7544
Parents: 977b80a
Author: Vinod Kone <[email protected]>
Authored: Tue Jan 27 17:09:23 2015 -0800
Committer: Vinod Kone <[email protected]>
Committed: Wed Jan 28 14:25:39 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/future.hpp | 50 +++++++++++++++++----
 1 file changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/97e029a4/3rdparty/libprocess/include/process/future.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/future.hpp 
b/3rdparty/libprocess/include/process/future.hpp
index 0326b23..63c79ee 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -541,6 +541,9 @@ private:
     Data();
     ~Data();
 
+    // Clears all callbacks.
+    void clearAllCallbacks();
+
     int lock;
     State state;
     bool discard;
@@ -943,11 +946,14 @@ bool Promise<T>::discard(Future<T> future)
   // DISCARDED. We don't need a lock because the state is now in
   // DISCARDED so there should not be any concurrent modifications.
   if (result) {
+    // Run onDiscarded callbacks.
     internal::run(future.data->onDiscardedCallbacks);
-    future.data->onDiscardedCallbacks.clear();
 
+    // Run onAny callbacks.
     internal::run(future.data->onAnyCallbacks, future);
-    future.data->onAnyCallbacks.clear();
+
+    // Clear all callbacks since the future is complete.
+    future.data->clearAllCallbacks();
   }
 
   return result;
@@ -982,6 +988,17 @@ Future<T>::Data::~Data()
 
 
 template <typename T>
+void Future<T>::Data::clearAllCallbacks()
+{
+  onAnyCallbacks.clear();
+  onDiscardCallbacks.clear();
+  onDiscardedCallbacks.clear();
+  onFailedCallbacks.clear();
+  onReadyCallbacks.clear();
+}
+
+
+template <typename T>
 Future<T>::Future()
   : data(new Data()) {}
 
@@ -1068,10 +1085,22 @@ bool Future<T>::discard()
 {
   bool result = false;
 
+  std::vector<DiscardCallback> callbacks;
   internal::acquire(&data->lock);
   {
     if (!data->discard && data->state == PENDING) {
       result = data->discard = true;
+
+      // NOTE: We make a copy of the onDiscard callbacks here
+      // because it is possible that another thread completes this
+      // future (ready, failed or discarded) when the current thread
+      // is out of this critical section but *before* it executed the
+      // onDiscard callbacks. If that happens, the other thread might
+      // be clearing the onDiscard callbacks (via clearAllCallbacks())
+      // while the current thread is executing or clearing the
+      // onDiscard callbacks, causing thread safety issue.
+      callbacks = data->onDiscardCallbacks;
+      data->onDiscardCallbacks.clear();
     }
   }
   internal::release(&data->lock);
@@ -1081,8 +1110,7 @@ bool Future<T>::discard()
   // be set so we won't be adding anything else to
   // 'Data::onDiscardCallbacks'.
   if (result) {
-    internal::run(data->onDiscardCallbacks);
-    data->onDiscardCallbacks.clear();
+    internal::run(callbacks);
   }
 
   return result;
@@ -1608,11 +1636,14 @@ bool Future<T>::set(const T& _t)
   // don't need a lock because the state is now in READY so there
   // should not be any concurrent modications.
   if (result) {
+    // Run onReady callbacks.
     internal::run(data->onReadyCallbacks, *data->t);
-    data->onReadyCallbacks.clear();
 
+    // Run onAny callbacks.
     internal::run(data->onAnyCallbacks, *this);
-    data->onAnyCallbacks.clear();
+
+    // Clear all callbacks since the future is complete.
+    data->clearAllCallbacks();
   }
 
   return result;
@@ -1638,11 +1669,14 @@ bool Future<T>::fail(const std::string& _message)
   // don't need a lock because the state is now in FAILED so there
   // should not be any concurrent modications.
   if (result) {
+    // Run onFailed callbacks.
     internal::run(data->onFailedCallbacks, *data->message);
-    data->onFailedCallbacks.clear();
 
+    // Run onAny callbacks.
     internal::run(data->onAnyCallbacks, *this);
-    data->onAnyCallbacks.clear();
+
+    // Clear all callbacks since the future is complete.
+    data->clearAllCallbacks();
   }
 
   return result;

Reply via email to