Repository: mesos Updated Branches: refs/heads/master 5db3fea16 -> 8bfc4d471
Added a Sequence abstraction. Review: https://reviews.apache.org/r/17476 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bfc4d47 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bfc4d47 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bfc4d47 Branch: refs/heads/master Commit: 8bfc4d4711d88995101861d15a5dc6303ad6d904 Parents: 5db3fea Author: Jie Yu <yujie....@gmail.com> Authored: Mon Jan 27 13:32:57 2014 -0800 Committer: Jie Yu <yujie....@gmail.com> Committed: Tue Mar 4 15:38:02 2014 -0800 ---------------------------------------------------------------------- 3rdparty/libprocess/Makefile.am | 2 + .../libprocess/include/process/sequence.hpp | 177 +++++++++++++ .../libprocess/src/tests/sequence_tests.cpp | 257 +++++++++++++++++++ 3 files changed, 436 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/Makefile.am ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am index a33b0ae..08a709a 100644 --- a/3rdparty/libprocess/Makefile.am +++ b/3rdparty/libprocess/Makefile.am @@ -93,6 +93,7 @@ libprocess_la_SOURCES += \ $(top_srcdir)/include/process/protobuf.hpp \ $(top_srcdir)/include/process/reap.hpp \ $(top_srcdir)/include/process/run.hpp \ + $(top_srcdir)/include/process/sequence.hpp \ $(top_srcdir)/include/process/shared.hpp \ $(top_srcdir)/include/process/socket.hpp \ $(top_srcdir)/include/process/statistics.hpp \ @@ -115,6 +116,7 @@ tests_SOURCES = \ src/tests/owned_tests.cpp \ src/tests/process_tests.cpp \ src/tests/reap_tests.cpp \ + src/tests/sequence_tests.cpp \ src/tests/shared_tests.cpp \ src/tests/statistics_tests.cpp \ src/tests/subprocess_tests.cpp \ http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/include/process/sequence.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/sequence.hpp b/3rdparty/libprocess/include/process/sequence.hpp new file mode 100644 index 0000000..241afd2 --- /dev/null +++ b/3rdparty/libprocess/include/process/sequence.hpp @@ -0,0 +1,177 @@ +#ifndef __PROCESS_SEQUENCE_HPP__ +#define __PROCESS_SEQUENCE_HPP__ + +#include <glog/logging.h> + +#include <process/future.hpp> +#include <process/owned.hpp> +#include <process/process.hpp> + +#include <stout/lambda.hpp> +#include <stout/nothing.hpp> + +namespace process { + +// Forward declaration. +class SequenceProcess; + +// Provides an abstraction that serializes the execution of a sequence +// of callbacks. +class Sequence +{ +public: + Sequence(); + ~Sequence(); + + // Registers a callback that will be invoked when all the futures + // returned by the previously registered callbacks are in + // non-pending status (i.e., ready, discarded or failed). Due to + // these semantics, we should avoid registering a callback which + // returns a future that could be in non-pending status for a long + // time because it will prevent all the subsequent callbacks from + // being invoked. This is analogous to the requirement that we want + // to avoid invoking blocking function in a libprocess handler. A + // user is allowed to cancel a registered callback by discarding the + // returned future. Other callbacks in this sequence will NOT be + // affected. The subsequent callbacks will not be invoked until the + // future is actually DISCARDED. + template <typename T> + Future<T> add(const lambda::function<Future<T>(void)>& callback); + +private: + // Not copyable, not assignable. + Sequence(const Sequence&); + Sequence& operator = (const Sequence&); + + SequenceProcess* process; +}; + + +class SequenceProcess : public Process<SequenceProcess> +{ +public: + SequenceProcess() : last(Nothing()) {} + + template <typename T> + Future<T> add(const lambda::function<Future<T>(void)>& callback) + { + // This is the future that is used to notify the next callback + // (denoted by 'N' in the following graph). + Owned<Promise<Nothing> > notifier(new Promise<Nothing>()); + + // This is the future that will be returned to the user (denoted + // by 'F' in the following graph). + Owned<Promise<T> > promise(new Promise<T>()); + + // We use a graph to show how we hook these futures. Each box in + // the graph represents a future. As mentioned above, 'F' denotes + // a future that will be returned to the user, and 'N' denotes a + // future that is used for notifying the next callback. Each arrow + // represents a "notification" relation. We will explain in detail + // what "notification" means in the following. + // + // 'last' 'last' 'last' + // | | | + // v v v + // +---+ +---+ +---+ +---+ +---+ +---+ + // | N | | N |--+ | N | | N |--+ | N |--+ | N | + // +---+ +---+ | +---+ +---+ | +---+ | +---+ + // ==> | ^ ==> | ^ | ^ + // | | | | | | + // | +---+ | +---+ | +---+ + // +-->| F | +-->| F | +-->| F | + // +---+ +---+ +---+ + // + // Initial => Added one callback => Added two callbacks + + // Setup the "notification" from 'F' to 'N' so that when a + // callback is done, signal the notifier ('N'). + promise->future().onAny(lambda::bind(&completed, notifier)); + + // Setup the "notification" from previous 'N' to 'F' so that when + // a notifier ('N') is set (indicating the previous callback has + // completed), invoke the next callback ('F') in the sequence. + last.onAny(lambda::bind(¬ified<T>, promise, callback)); + + // In the following, we setup the hooks so that if this sequence + // process is being terminated, all pending callbacks will be + // discarded. We use weak futures here to avoid cyclic dependencies. + + // Discard the future associated with this notifier. + notifier->future().onDiscard( + lambda::bind( + &internal::discard<T>, + WeakFuture<T>(promise->future()))); + + // Discard the notifier associated with the previous future. + notifier->future().onDiscard( + lambda::bind( + &internal::discard<Nothing>, + WeakFuture<Nothing>(last))); + + // Update the 'last'. + last = notifier->future(); + + return promise->future(); + } + +protected: + virtual void finalize() + { + last.discard(); + + // TODO(jieyu): Do we need to wait for the future of the last + // callback to be in DISCARDED state? + } + +private: + // Invoked when a callback is done. + static void completed(Owned<Promise<Nothing> > notifier) + { + notifier->set(Nothing()); + } + + // Invoked when a notifier is set. + template <typename T> + static void notified( + Owned<Promise<T> > promise, + const lambda::function<Future<T>(void)>& callback) + { + if (promise->future().hasDiscard()) { + // The user has shown the intention to discard this callback + // (i.e., by calling future.discard()). As a result, we will + // just skip this callback. + promise->discard(); + } else { + promise->associate(callback()); + } + } + + Future<Nothing> last; +}; + + +inline Sequence::Sequence() +{ + process = new SequenceProcess(); + process::spawn(process); +} + + +inline Sequence::~Sequence() +{ + process::terminate(process); + process::wait(process); + delete process; +} + + +template <typename T> +Future<T> Sequence::add(const lambda::function<Future<T>(void)>& callback) +{ + return dispatch(process, &SequenceProcess::add<T>, callback); +} + +} // namespace process { + +#endif // __PROCESS_SEQUENCE_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/src/tests/sequence_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/sequence_tests.cpp b/3rdparty/libprocess/src/tests/sequence_tests.cpp new file mode 100644 index 0000000..6b5d161 --- /dev/null +++ b/3rdparty/libprocess/src/tests/sequence_tests.cpp @@ -0,0 +1,257 @@ +#include <stdlib.h> + +#include <gmock/gmock.h> + +#include <process/defer.hpp> +#include <process/dispatch.hpp> +#include <process/future.hpp> +#include <process/gtest.hpp> +#include <process/gmock.hpp> +#include <process/process.hpp> +#include <process/sequence.hpp> + +#include <stout/nothing.hpp> + +using namespace process; + +using testing::_; +using testing::Return; + +class TestProcess : public Process<TestProcess> +{ +public: + Future<Nothing> foo() + { + return dispatch(self(), &Self::_foo); + } + + Future<Nothing> _foo() + { + return dispatch(self(), &Self::__foo); + } + + Future<Nothing> __foo() + { + return promise.future(); + } + + Nothing bar() + { + return Nothing(); + } + + Promise<Nothing> promise; +}; + + +// The test verifies that callbacks are properly serialized by the +// Sequence object. +TEST(Sequence, Serialize) +{ + TestProcess process; + spawn(process); + + Sequence sequence; + + Future<Nothing> bar = FUTURE_DISPATCH(_, &TestProcess::bar); + + lambda::function<Future<Nothing>(void)> f; + + f = defer(process, &TestProcess::foo); + sequence.add(f); + + f = defer(process, &TestProcess::bar); + sequence.add(f); + + // Flush the event queue to make sure that if the method 'bar' could + // have been invoked, the future 'bar' would be satisfied before the + // pending check below. + Clock::pause(); + Clock::settle(); + Clock::resume(); + + EXPECT_TRUE(bar.isPending()); + + process.promise.set(Nothing()); + + AWAIT_READY(bar); + + terminate(process); + wait(process); +} + + +// Used to verify the discard semantics of Sequence. +class DiscardProcess : public Process<DiscardProcess> +{ +public: + Future<Nothing> func0() { return promise.future(); } + + MOCK_METHOD0(func1, Nothing(void)); + MOCK_METHOD0(func2, Nothing(void)); + MOCK_METHOD0(func3, Nothing(void)); + + Promise<Nothing> promise; +}; + + +// The tests verifies semantics of discarding one returned future. +TEST(Sequence, DiscardOne) +{ + DiscardProcess process; + spawn(process); + + Sequence sequence; + + lambda::function<Future<Nothing>(void)> f; + + f = defer(process, &DiscardProcess::func0); + Future<Nothing> f0 = sequence.add(f); + + f = defer(process, &DiscardProcess::func1); + Future<Nothing> f1 = sequence.add(f); + + f = defer(process, &DiscardProcess::func2); + Future<Nothing> f2 = sequence.add(f); + + f = defer(process, &DiscardProcess::func3); + Future<Nothing> f3 = sequence.add(f); + + EXPECT_CALL(process, func1()) + .WillOnce(Return(Nothing())); + + EXPECT_CALL(process, func2()) + .Times(0); + + EXPECT_CALL(process, func3()) + .WillOnce(Return(Nothing())); + + // Flush the event queue to make sure that all callbacks have been + // added to the sequence. + Clock::pause(); + Clock::settle(); + Clock::resume(); + + f2.discard(); + + // Start the sequence of calls. + process.promise.set(Nothing()); + + AWAIT_READY(f3); + + terminate(process); + wait(process); +} + + +// The test verifies the semantics of deleting the Sequence object, +// which will result in all pending callbacks being discarded. +TEST(Sequence, DiscardAll) +{ + DiscardProcess process; + spawn(process); + + Sequence* sequence = new Sequence(); + + lambda::function<Future<Nothing>(void)> f; + + f = defer(process, &DiscardProcess::func0); + Future<Nothing> f0 = sequence->add(f); + + f = defer(process, &DiscardProcess::func1); + Future<Nothing> f1 = sequence->add(f); + + f = defer(process, &DiscardProcess::func2); + Future<Nothing> f2 = sequence->add(f); + + f = defer(process, &DiscardProcess::func3); + Future<Nothing> f3 = sequence->add(f); + + EXPECT_CALL(process, func1()) + .Times(0); + + EXPECT_CALL(process, func2()) + .Times(0); + + EXPECT_CALL(process, func3()) + .Times(0); + + // Flush the event queue to make sure that all callbacks have been + // added to the sequence. + Clock::pause(); + Clock::settle(); + Clock::resume(); + + // This should cancel all pending callbacks. + delete sequence; + + // Start the sequence of calls. + process.promise.set(Nothing()); + + AWAIT_READY(f0); + AWAIT_DISCARDED(f1); + AWAIT_DISCARDED(f2); + AWAIT_DISCARDED(f3); + + terminate(process); + wait(process); +} + + +class RandomProcess : public Process<RandomProcess> +{ +public: + RandomProcess() : value(0) {} + + Nothing verify() + { + EXPECT_EQ(0, value); + return Nothing(); + } + + Future<Nothing> pulse() + { + value++; + return dispatch(self(), &Self::_pulse); + } + + Nothing _pulse() + { + value--; + return Nothing(); + } + +private: + int value; +}; + + +TEST(Sequence, Random) +{ + RandomProcess process; + spawn(process); + + Sequence sequence; + + for (int i = 0; i < 100; i++) { + lambda::function<Future<Nothing>(void)> f; + + // We randomly do 'pulse' and 'verify'. The idea here is that: if + // sequence is not used, a 'verify' may see an intermediate + // result of a 'pulse', in which case the value is not zero. + if (::random() % 2 == 0) { + f = defer(process, &RandomProcess::pulse); + } else { + f = defer(process, &RandomProcess::verify); + } + + sequence.add(f); + } + + Clock::pause(); + Clock::settle(); + Clock::resume(); + + terminate(process); + wait(process); +}