Fixed scheduler library to send calls in order.

Review: https://reviews.apache.org/r/37467


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4599f9fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4599f9fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4599f9fe

Branch: refs/heads/master
Commit: 4599f9fe38a74a3fe16d4ab4b83b00b18c305465
Parents: de8399a
Author: Vinod Kone <[email protected]>
Authored: Thu Aug 13 18:49:57 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Thu Aug 13 23:12:02 2015 -0700

----------------------------------------------------------------------
 src/scheduler/scheduler.cpp | 111 +++++++++++++++++++++++++--------------
 1 file changed, 71 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4599f9fe/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 37b5457..cf433ff 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -193,49 +193,18 @@ public:
 
   void send(const Call& call)
   {
-    if (master.isNone()) {
-      drop(call, "Disconnected");
-      return;
-    }
-
-    Option<Error> error = validation::scheduler::call::validate(devolve(call));
+    // NOTE: We enqueue the calls to guarantee that a call is sent only after
+    // a response has been received for the previous call.
+    // TODO(vinod): Use HTTP pipelining instead.
+    calls.push(call);
 
-    if (error.isSome()) {
-      drop(call, error.get().message);
+    if (calls.size() > 1) {
       return;
     }
 
-    // TODO(vinod): Add support for sending MESSAGE calls directly
-    // to the slave, instead of relaying it through the master, as
-    // the scheduler driver does.
-
-    const string body = serialize(contentType, call);
-    const hashmap<string, string> headers{{"Accept", stringify(contentType)}};
-
-    Future<Response> response;
-
-    if (call.type() == Call::SUBSCRIBE) {
-      // Each subscription requires a new connection.
-      disconnect();
-
-      // Send a streaming request for Subscribe call.
-      response = process::http::streaming::post(
-          master.get(),
-          "api/v1/scheduler",
-          headers,
-          body,
-          stringify(contentType));
-    } else {
-      response = post(
-          master.get(),
-          "api/v1/scheduler",
-          headers,
-          body,
-          stringify(contentType));
-    }
-
-    response
-      .onAny(defer(self(), &Self::_send, call, lambda::_1));
+    // If this is the first in the queue send the call.
+    _send(call)
+      .onAny(defer(self(), &Self::___send));
   }
 
 protected:
@@ -317,7 +286,55 @@ protected:
     LOG(WARNING) << "Dropping " << call.type() << ": " << message;
   }
 
-  void _send(const Call& call, const Future<Response>& response)
+  Future<Nothing> _send(const Call& call)
+  {
+    if (master.isNone()) {
+      drop(call, "Disconnected");
+      return Nothing();
+    }
+
+    Option<Error> error = validation::scheduler::call::validate(devolve(call));
+
+    if (error.isSome()) {
+      drop(call, error.get().message);
+      return Nothing();
+    }
+
+    // TODO(vinod): Add support for sending MESSAGE calls directly
+    // to the slave, instead of relaying it through the master, as
+    // the scheduler driver does.
+
+    const string body = serialize(contentType, call);
+    const hashmap<string, string> headers{{"Accept", stringify(contentType)}};
+
+    Future<Response> response;
+
+    if (call.type() == Call::SUBSCRIBE) {
+      // Each subscription requires a new connection.
+      disconnect();
+
+      // Send a streaming request for Subscribe call.
+      response = process::http::streaming::post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    } else {
+      response = post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    }
+
+    return response
+      .onAny(defer(self(), &Self::__send, call, lambda::_1))
+      .then([]() { return Nothing(); });
+  }
+
+  void __send(const Call& call, const Future<Response>& response)
   {
     CHECK(!response.isDiscarded());
 
@@ -363,6 +380,18 @@ protected:
           stringify(call.type()) + " call: " + response.get().body);
   }
 
+  void ___send()
+  {
+    CHECK_LT(0, calls.size());
+    calls.pop();
+
+    // Execute the next event in the queue.
+    if (!calls.empty()) {
+      _send(calls.front())
+        .onAny(defer(self(), &Self::___send));
+    }
+  }
+
   void read()
   {
     connection.get().decoder->read()
@@ -468,6 +497,8 @@ private:
 
   queue<Event> events;
 
+  queue<Call> calls;
+
   Option<UPID> master;
 };
 

Reply via email to