Factor out Future callback invocation. Factor out callback invocation in Future to make the logic easier to read. It also de-duplicates some code.
Review: https://reviews.apache.org/r/28195 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2885c809 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2885c809 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2885c809 Branch: refs/heads/master Commit: 2885c809a0ee00e1ddbc72a10fbaa3456115eca0 Parents: 4b6e401 Author: Joris Van Remoortere <[email protected]> Authored: Fri Nov 21 08:03:37 2014 -0800 Committer: Benjamin Hindman <[email protected]> Committed: Fri Nov 21 08:03:39 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/future.hpp | 59 +++++++++------------ 1 file changed, 24 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2885c809/3rdparty/libprocess/include/process/future.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index 2e4f9ef..608934d 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -566,6 +566,23 @@ private: }; +namespace internal { + + // Helper for executing callbacks that have been registered. + template <typename C, typename... Arguments> + void run(std::queue<C>& callbacks, Arguments&&... arguments) + { + // TODO(jmlvanre): replace this with a vector and make parameter + // const since we can iterate it safely. + while (!callbacks.empty()) { + callbacks.front()(std::forward<Arguments>(arguments)...); + callbacks.pop(); + } + } + +} // namespace internal { + + // Represents a weak reference to a future. This class is used to // break cyclic dependencies between futures. template <typename T> @@ -929,17 +946,9 @@ bool Promise<T>::discard(Future<T> future) // DISCARDED. We don't need a lock because the state is now in // DISCARDED so there should not be any concurrent modifications. if (result) { - while (!data->onDiscardedCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onDiscardedCallbacks.front()(); - data->onDiscardedCallbacks.pop(); - } + internal::run(future.data->onDiscardedCallbacks); - while (!data->onAnyCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onAnyCallbacks.front()(future); - data->onAnyCallbacks.pop(); - } + internal::run(future.data->onAnyCallbacks, future); } return result; @@ -1073,11 +1082,7 @@ bool Future<T>::discard() // be set so we won't be adding anything else to // 'Data::onDiscardCallbacks'. if (result) { - while (!data->onDiscardCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onDiscardCallbacks.front()(); - data->onDiscardCallbacks.pop(); - } + internal::run(data->onDiscardCallbacks); } return result; @@ -1603,17 +1608,9 @@ bool Future<T>::set(const T& _t) // don't need a lock because the state is now in READY so there // should not be any concurrent modications. if (result) { - while (!data->onReadyCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onReadyCallbacks.front()(*data->t); - data->onReadyCallbacks.pop(); - } + internal::run(data->onReadyCallbacks, *data->t); - while (!data->onAnyCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onAnyCallbacks.front()(*this); - data->onAnyCallbacks.pop(); - } + internal::run(data->onAnyCallbacks, *this); } return result; @@ -1639,17 +1636,9 @@ bool Future<T>::fail(const std::string& _message) // don't need a lock because the state is now in FAILED so there // should not be any concurrent modications. if (result) { - while (!data->onFailedCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onFailedCallbacks.front()(*data->message); - data->onFailedCallbacks.pop(); - } + internal::run(data->onFailedCallbacks, *data->message); - while (!data->onAnyCallbacks.empty()) { - // TODO(*): Invoke callbacks in another execution context. - data->onAnyCallbacks.front()(*this); - data->onAnyCallbacks.pop(); - } + internal::run(data->onAnyCallbacks, *this); } return result;
