Fixed scheduler library to send calls in order. Review: https://reviews.apache.org/r/37467
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4599f9fe Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4599f9fe Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4599f9fe Branch: refs/heads/master Commit: 4599f9fe38a74a3fe16d4ab4b83b00b18c305465 Parents: de8399a Author: Vinod Kone <[email protected]> Authored: Thu Aug 13 18:49:57 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Aug 13 23:12:02 2015 -0700 ---------------------------------------------------------------------- src/scheduler/scheduler.cpp | 111 +++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4599f9fe/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 37b5457..cf433ff 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -193,49 +193,18 @@ public: void send(const Call& call) { - if (master.isNone()) { - drop(call, "Disconnected"); - return; - } - - Option<Error> error = validation::scheduler::call::validate(devolve(call)); + // NOTE: We enqueue the calls to guarantee that a call is sent only after + // a response has been received for the previous call. + // TODO(vinod): Use HTTP pipelining instead. + calls.push(call); - if (error.isSome()) { - drop(call, error.get().message); + if (calls.size() > 1) { return; } - // TODO(vinod): Add support for sending MESSAGE calls directly - // to the slave, instead of relaying it through the master, as - // the scheduler driver does. - - const string body = serialize(contentType, call); - const hashmap<string, string> headers{{"Accept", stringify(contentType)}}; - - Future<Response> response; - - if (call.type() == Call::SUBSCRIBE) { - // Each subscription requires a new connection. - disconnect(); - - // Send a streaming request for Subscribe call. - response = process::http::streaming::post( - master.get(), - "api/v1/scheduler", - headers, - body, - stringify(contentType)); - } else { - response = post( - master.get(), - "api/v1/scheduler", - headers, - body, - stringify(contentType)); - } - - response - .onAny(defer(self(), &Self::_send, call, lambda::_1)); + // If this is the first in the queue send the call. + _send(call) + .onAny(defer(self(), &Self::___send)); } protected: @@ -317,7 +286,55 @@ protected: LOG(WARNING) << "Dropping " << call.type() << ": " << message; } - void _send(const Call& call, const Future<Response>& response) + Future<Nothing> _send(const Call& call) + { + if (master.isNone()) { + drop(call, "Disconnected"); + return Nothing(); + } + + Option<Error> error = validation::scheduler::call::validate(devolve(call)); + + if (error.isSome()) { + drop(call, error.get().message); + return Nothing(); + } + + // TODO(vinod): Add support for sending MESSAGE calls directly + // to the slave, instead of relaying it through the master, as + // the scheduler driver does. + + const string body = serialize(contentType, call); + const hashmap<string, string> headers{{"Accept", stringify(contentType)}}; + + Future<Response> response; + + if (call.type() == Call::SUBSCRIBE) { + // Each subscription requires a new connection. + disconnect(); + + // Send a streaming request for Subscribe call. + response = process::http::streaming::post( + master.get(), + "api/v1/scheduler", + headers, + body, + stringify(contentType)); + } else { + response = post( + master.get(), + "api/v1/scheduler", + headers, + body, + stringify(contentType)); + } + + return response + .onAny(defer(self(), &Self::__send, call, lambda::_1)) + .then([]() { return Nothing(); }); + } + + void __send(const Call& call, const Future<Response>& response) { CHECK(!response.isDiscarded()); @@ -363,6 +380,18 @@ protected: stringify(call.type()) + " call: " + response.get().body); } + void ___send() + { + CHECK_LT(0, calls.size()); + calls.pop(); + + // Execute the next event in the queue. + if (!calls.empty()) { + _send(calls.front()) + .onAny(defer(self(), &Self::___send)); + } + } + void read() { connection.get().decoder->read() @@ -468,6 +497,8 @@ private: queue<Event> events; + queue<Call> calls; + Option<UPID> master; };
