Repository: mesos
Updated Branches:
  refs/heads/master 5db3fea16 -> 8bfc4d471


Added a Sequence abstraction.

Review: https://reviews.apache.org/r/17476


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bfc4d47
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bfc4d47
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bfc4d47

Branch: refs/heads/master
Commit: 8bfc4d4711d88995101861d15a5dc6303ad6d904
Parents: 5db3fea
Author: Jie Yu <yujie....@gmail.com>
Authored: Mon Jan 27 13:32:57 2014 -0800
Committer: Jie Yu <yujie....@gmail.com>
Committed: Tue Mar 4 15:38:02 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   2 +
 .../libprocess/include/process/sequence.hpp     | 177 +++++++++++++
 .../libprocess/src/tests/sequence_tests.cpp     | 257 +++++++++++++++++++
 3 files changed, 436 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index a33b0ae..08a709a 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -93,6 +93,7 @@ libprocess_la_SOURCES +=                                      
\
   $(top_srcdir)/include/process/protobuf.hpp                   \
   $(top_srcdir)/include/process/reap.hpp                       \
   $(top_srcdir)/include/process/run.hpp                                \
+  $(top_srcdir)/include/process/sequence.hpp                   \
   $(top_srcdir)/include/process/shared.hpp                     \
   $(top_srcdir)/include/process/socket.hpp                     \
   $(top_srcdir)/include/process/statistics.hpp                 \
@@ -115,6 +116,7 @@ tests_SOURCES =                                             
        \
   src/tests/owned_tests.cpp                                    \
   src/tests/process_tests.cpp                                  \
   src/tests/reap_tests.cpp                                     \
+  src/tests/sequence_tests.cpp                                 \
   src/tests/shared_tests.cpp                                   \
   src/tests/statistics_tests.cpp                               \
   src/tests/subprocess_tests.cpp                               \

http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/include/process/sequence.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/sequence.hpp 
b/3rdparty/libprocess/include/process/sequence.hpp
new file mode 100644
index 0000000..241afd2
--- /dev/null
+++ b/3rdparty/libprocess/include/process/sequence.hpp
@@ -0,0 +1,177 @@
+#ifndef __PROCESS_SEQUENCE_HPP__
+#define __PROCESS_SEQUENCE_HPP__
+
+#include <glog/logging.h>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+
+namespace process {
+
+// Forward declaration.
+class SequenceProcess;
+
+// Provides an abstraction that serializes the execution of a sequence
+// of callbacks.
+class Sequence
+{
+public:
+  Sequence();
+  ~Sequence();
+
+  // Registers a callback that will be invoked when all the futures
+  // returned by the previously registered callbacks are in
+  // non-pending status (i.e., ready, discarded or failed). Due to
+  // these semantics, we should avoid registering a callback which
+  // returns a future that could be in non-pending status for a long
+  // time because it will prevent all the subsequent callbacks from
+  // being invoked. This is analogous to the requirement that we want
+  // to avoid invoking blocking function in a libprocess handler. A
+  // user is allowed to cancel a registered callback by discarding the
+  // returned future. Other callbacks in this sequence will NOT be
+  // affected. The subsequent callbacks will not be invoked until the
+  // future is actually DISCARDED.
+  template <typename T>
+  Future<T> add(const lambda::function<Future<T>(void)>& callback);
+
+private:
+  // Not copyable, not assignable.
+  Sequence(const Sequence&);
+  Sequence& operator = (const Sequence&);
+
+  SequenceProcess* process;
+};
+
+
+class SequenceProcess : public Process<SequenceProcess>
+{
+public:
+  SequenceProcess() : last(Nothing()) {}
+
+  template <typename T>
+  Future<T> add(const lambda::function<Future<T>(void)>& callback)
+  {
+    // This is the future that is used to notify the next callback
+    // (denoted by 'N' in the following graph).
+    Owned<Promise<Nothing> > notifier(new Promise<Nothing>());
+
+    // This is the future that will be returned to the user (denoted
+    // by 'F' in the following graph).
+    Owned<Promise<T> > promise(new Promise<T>());
+
+    // We use a graph to show how we hook these futures. Each box in
+    // the graph represents a future. As mentioned above, 'F' denotes
+    // a future that will be returned to the user, and 'N' denotes a
+    // future that is used for notifying the next callback. Each arrow
+    // represents a "notification" relation. We will explain in detail
+    // what "notification" means in the following.
+    //
+    // 'last'                 'last'                            'last'
+    //   |                      |                                 |
+    //   v                      v                                 v
+    // +---+       +---+      +---+       +---+      +---+      +---+
+    // | N |       | N |--+   | N |       | N |--+   | N |--+   | N |
+    // +---+       +---+  |   +---+       +---+  |   +---+  |   +---+
+    //        ==>         |     ^    ==>         |     ^    |     ^
+    //                    |     |                |     |    |     |
+    //                    |   +---+              |   +---+  |   +---+
+    //                    +-->| F |              +-->| F |  +-->| F |
+    //                        +---+                  +---+      +---+
+    //
+    // Initial =>  Added one callback  =>    Added two callbacks
+
+    // Setup the "notification" from 'F' to 'N' so that when a
+    // callback is done, signal the notifier ('N').
+    promise->future().onAny(lambda::bind(&completed, notifier));
+
+    // Setup the "notification" from previous 'N' to 'F' so that when
+    // a notifier ('N') is set (indicating the previous callback has
+    // completed), invoke the next callback ('F') in the sequence.
+    last.onAny(lambda::bind(&notified<T>, promise, callback));
+
+    // In the following, we setup the hooks so that if this sequence
+    // process is being terminated, all pending callbacks will be
+    // discarded. We use weak futures here to avoid cyclic dependencies.
+
+    // Discard the future associated with this notifier.
+    notifier->future().onDiscard(
+        lambda::bind(
+            &internal::discard<T>,
+            WeakFuture<T>(promise->future())));
+
+    // Discard the notifier associated with the previous future.
+    notifier->future().onDiscard(
+        lambda::bind(
+            &internal::discard<Nothing>,
+            WeakFuture<Nothing>(last)));
+
+    // Update the 'last'.
+    last = notifier->future();
+
+    return promise->future();
+  }
+
+protected:
+  virtual void finalize()
+  {
+    last.discard();
+
+    // TODO(jieyu): Do we need to wait for the future of the last
+    // callback to be in DISCARDED state?
+  }
+
+private:
+  // Invoked when a callback is done.
+  static void completed(Owned<Promise<Nothing> > notifier)
+  {
+    notifier->set(Nothing());
+  }
+
+  // Invoked when a notifier is set.
+  template <typename T>
+  static void notified(
+      Owned<Promise<T> > promise,
+      const lambda::function<Future<T>(void)>& callback)
+  {
+    if (promise->future().hasDiscard()) {
+      // The user has shown the intention to discard this callback
+      // (i.e., by calling future.discard()). As a result, we will
+      // just skip this callback.
+      promise->discard();
+    } else {
+      promise->associate(callback());
+    }
+  }
+
+  Future<Nothing> last;
+};
+
+
+inline Sequence::Sequence()
+{
+  process = new SequenceProcess();
+  process::spawn(process);
+}
+
+
+inline Sequence::~Sequence()
+{
+  process::terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+template <typename T>
+Future<T> Sequence::add(const lambda::function<Future<T>(void)>& callback)
+{
+  return dispatch(process, &SequenceProcess::add<T>, callback);
+}
+
+} // namespace process {
+
+#endif // __PROCESS_SEQUENCE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8bfc4d47/3rdparty/libprocess/src/tests/sequence_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/sequence_tests.cpp 
b/3rdparty/libprocess/src/tests/sequence_tests.cpp
new file mode 100644
index 0000000..6b5d161
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/sequence_tests.cpp
@@ -0,0 +1,257 @@
+#include <stdlib.h>
+
+#include <gmock/gmock.h>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/gmock.hpp>
+#include <process/process.hpp>
+#include <process/sequence.hpp>
+
+#include <stout/nothing.hpp>
+
+using namespace process;
+
+using testing::_;
+using testing::Return;
+
+class TestProcess : public Process<TestProcess>
+{
+public:
+  Future<Nothing> foo()
+  {
+    return dispatch(self(), &Self::_foo);
+  }
+
+  Future<Nothing> _foo()
+  {
+    return dispatch(self(), &Self::__foo);
+  }
+
+  Future<Nothing> __foo()
+  {
+    return promise.future();
+  }
+
+  Nothing bar()
+  {
+    return Nothing();
+  }
+
+  Promise<Nothing> promise;
+};
+
+
+// The test verifies that callbacks are properly serialized by the
+// Sequence object.
+TEST(Sequence, Serialize)
+{
+  TestProcess process;
+  spawn(process);
+
+  Sequence sequence;
+
+  Future<Nothing> bar = FUTURE_DISPATCH(_, &TestProcess::bar);
+
+  lambda::function<Future<Nothing>(void)> f;
+
+  f = defer(process, &TestProcess::foo);
+  sequence.add(f);
+
+  f = defer(process, &TestProcess::bar);
+  sequence.add(f);
+
+  // Flush the event queue to make sure that if the method 'bar' could
+  // have been invoked, the future 'bar' would be satisfied before the
+  // pending check below.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  EXPECT_TRUE(bar.isPending());
+
+  process.promise.set(Nothing());
+
+  AWAIT_READY(bar);
+
+  terminate(process);
+  wait(process);
+}
+
+
+// Used to verify the discard semantics of Sequence.
+class DiscardProcess : public Process<DiscardProcess>
+{
+public:
+  Future<Nothing> func0() { return promise.future(); }
+
+  MOCK_METHOD0(func1, Nothing(void));
+  MOCK_METHOD0(func2, Nothing(void));
+  MOCK_METHOD0(func3, Nothing(void));
+
+  Promise<Nothing> promise;
+};
+
+
+// The tests verifies semantics of discarding one returned future.
+TEST(Sequence, DiscardOne)
+{
+  DiscardProcess process;
+  spawn(process);
+
+  Sequence sequence;
+
+  lambda::function<Future<Nothing>(void)> f;
+
+  f = defer(process, &DiscardProcess::func0);
+  Future<Nothing> f0 = sequence.add(f);
+
+  f = defer(process, &DiscardProcess::func1);
+  Future<Nothing> f1 = sequence.add(f);
+
+  f = defer(process, &DiscardProcess::func2);
+  Future<Nothing> f2 = sequence.add(f);
+
+  f = defer(process, &DiscardProcess::func3);
+  Future<Nothing> f3 = sequence.add(f);
+
+  EXPECT_CALL(process, func1())
+    .WillOnce(Return(Nothing()));
+
+  EXPECT_CALL(process, func2())
+    .Times(0);
+
+  EXPECT_CALL(process, func3())
+    .WillOnce(Return(Nothing()));
+
+  // Flush the event queue to make sure that all callbacks have been
+  // added to the sequence.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  f2.discard();
+
+  // Start the sequence of calls.
+  process.promise.set(Nothing());
+
+  AWAIT_READY(f3);
+
+  terminate(process);
+  wait(process);
+}
+
+
+// The test verifies the semantics of deleting the Sequence object,
+// which will result in all pending callbacks being discarded.
+TEST(Sequence, DiscardAll)
+{
+  DiscardProcess process;
+  spawn(process);
+
+  Sequence* sequence = new Sequence();
+
+  lambda::function<Future<Nothing>(void)> f;
+
+  f = defer(process, &DiscardProcess::func0);
+  Future<Nothing> f0 = sequence->add(f);
+
+  f = defer(process, &DiscardProcess::func1);
+  Future<Nothing> f1 = sequence->add(f);
+
+  f = defer(process, &DiscardProcess::func2);
+  Future<Nothing> f2 = sequence->add(f);
+
+  f = defer(process, &DiscardProcess::func3);
+  Future<Nothing> f3 = sequence->add(f);
+
+  EXPECT_CALL(process, func1())
+    .Times(0);
+
+  EXPECT_CALL(process, func2())
+    .Times(0);
+
+  EXPECT_CALL(process, func3())
+    .Times(0);
+
+  // Flush the event queue to make sure that all callbacks have been
+  // added to the sequence.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  // This should cancel all pending callbacks.
+  delete sequence;
+
+  // Start the sequence of calls.
+  process.promise.set(Nothing());
+
+  AWAIT_READY(f0);
+  AWAIT_DISCARDED(f1);
+  AWAIT_DISCARDED(f2);
+  AWAIT_DISCARDED(f3);
+
+  terminate(process);
+  wait(process);
+}
+
+
+class RandomProcess : public Process<RandomProcess>
+{
+public:
+  RandomProcess() : value(0) {}
+
+  Nothing verify()
+  {
+    EXPECT_EQ(0, value);
+    return Nothing();
+  }
+
+  Future<Nothing> pulse()
+  {
+    value++;
+    return dispatch(self(), &Self::_pulse);
+  }
+
+  Nothing _pulse()
+  {
+    value--;
+    return Nothing();
+  }
+
+private:
+  int value;
+};
+
+
+TEST(Sequence, Random)
+{
+  RandomProcess process;
+  spawn(process);
+
+  Sequence sequence;
+
+  for (int i = 0; i < 100; i++) {
+    lambda::function<Future<Nothing>(void)> f;
+
+    // We randomly do 'pulse' and 'verify'. The idea here is that: if
+    // sequence is not used, a 'verify' may see an intermediate
+    // result of a 'pulse', in which case the value is not zero.
+    if (::random() % 2 == 0) {
+      f = defer(process, &RandomProcess::pulse);
+    } else {
+      f = defer(process, &RandomProcess::verify);
+    }
+
+    sequence.add(f);
+  }
+
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  terminate(process);
+  wait(process);
+}

Reply via email to