Added a `call()` method to the v1 scheduler library. This patch adds a `call()` method to the scheduler library that allows clients to send a `v1::scheduler::Call` to the master and receive a `v1::scheduler::Response`.
It will be used to test operation state reconciliation. Review: https://reviews.apache.org/r/66460/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c39ef695 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c39ef695 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c39ef695 Branch: refs/heads/master Commit: c39ef69514e57ca7c90e764a4a617abf88cd144f Parents: 949b44e Author: Gaston Kleiman <[email protected]> Authored: Mon Apr 23 13:43:56 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Mon Apr 23 13:50:00 2018 -0700 ---------------------------------------------------------------------- include/mesos/v1/scheduler.hpp | 39 +++++++ include/mesos/v1/scheduler/scheduler.proto | 34 ++++++ .../org_apache_mesos_v1_scheduler_V0Mesos.cpp | 9 ++ src/scheduler/scheduler.cpp | 111 +++++++++++++++++++ 4 files changed, 193 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler.hpp ---------------------------------------------------------------------- diff --git a/include/mesos/v1/scheduler.hpp b/include/mesos/v1/scheduler.hpp index d56e088..b1dfbb1 100644 --- a/include/mesos/v1/scheduler.hpp +++ b/include/mesos/v1/scheduler.hpp @@ -28,6 +28,10 @@ #include <mesos/v1/scheduler/scheduler.hpp> +#include <process/future.hpp> + +#include <stout/option.hpp> + namespace mesos { namespace master { @@ -48,6 +52,7 @@ public: // Empty virtual destructor (necessary to instantiate subclasses). virtual ~MesosBase() {} virtual void send(const Call& call) = 0; + virtual process::Future<APIResult> call(const Call& callMessage) = 0; virtual void reconnect() = 0; }; @@ -94,6 +99,40 @@ public: // disconnected). virtual void send(const Call& call) override; + // Attempts to send a call to the master, returning the response. + // + // The scheduler should only invoke this method once it has received the + // 'connected' callback. Otherwise, a `Failure` will be returned. + // + // Some local validation of calls is performed, and the request will not be + // sent to the master if the validation fails. + // + // A `Failure` will be returned on validation failures or if an error happens + // when sending the request to the master, e.g., a master disconnection, or a + // deserialization error. + // + // If it was possible to receive a response from the server, the returned + // object will contain the HTTP response status code. + // + // There are three cases to consider depending on the HTTP response status + // code: + // + // (1) '202 ACCEPTED': Indicates the call was accepted for processing and + // neither `APIResult::response` nor `APIResult::error` will be set. + // + // (2) '200 OK': Indicates the call completed successfully. + // `APIResult::response` will be set if the `scheduler::Call::Type` + // has a corresponding `scheduler::Response::Type`, `APIResult::error` + // will not be set. + // + // (3) For all other HTTP status codes, the `APIResult::response` field will + // not be set and the `APIResult::error` field may be set to provide more + // information. + // + // Note: This method cannot be used to send `SUBSCRIBE` calls, use `send()` + // instead. + virtual process::Future<APIResult> call(const Call& callMessage) override; + // Force a reconnection with the master. // // In the case of a one-way network partition, the connection between the http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/include/mesos/v1/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto index b912901..fcfec5e 100644 --- a/include/mesos/v1/scheduler/scheduler.proto +++ b/include/mesos/v1/scheduler/scheduler.proto @@ -495,3 +495,37 @@ message Call { optional Request request = 11; optional Suppress suppress = 16; } + +/** + * This message is used by the C++ Scheduler HTTP API library as the return + * type of the `call()` method. The message includes the HTTP status code with + * which the master responded, and optionally a `scheduler::Response` message. + * + * There are three cases to consider depending on the HTTP response status code: + * + * (1) '202 ACCEPTED': Indicates the call was accepted for processing and + * neither `response` nor `error` will be set. + * + * (2) '200 OK': Indicates the call completed successfully, and the `response` + * field will be set if the `scheduler::Call::Type` has a corresponding + * `scheduler::Response::Type`; `error` will not be set. + * + * (3) For all other HTTP status codes, the `response` field will not be set + * and the `error` field may be set to provide more information. + * + * NOTE: This message is used by the C++ Scheduler HTTP API library and is not + * part of the API specification. + */ +message APIResult { + // HTTP status code with which the master responded. + required uint32 status_code = 1; + + // This field will only be set if the call completed successfully and the + // master responded with `200 OK` and a non-empty body. + optional Response response = 2; + + // This field will only be set if the call did not complete successfully and + // the master responded with a status other than `202 Accepted` or `200 OK`, + // and with a non-empty body. + optional string error = 3; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp ---------------------------------------------------------------------- diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp index 60b17b9..ea8d54f 100644 --- a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp +++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp @@ -28,6 +28,7 @@ #include <process/clock.hpp> #include <process/delay.hpp> #include <process/dispatch.hpp> +#include <process/future.hpp> #include <process/id.hpp> #include <process/owned.hpp> #include <process/process.hpp> @@ -134,6 +135,14 @@ public: UNREACHABLE(); } + virtual process::Future<v1::scheduler::APIResult> call( + const v1::scheduler::Call& callMessage) override + { + // The driver does not support sending a `v1::scheduler::Call` that returns + // a `v1::scheduler::Response`. + UNREACHABLE(); + } + process::Owned<V0ToV1AdapterProcess> process; private: http://git-wip-us.apache.org/repos/asf/mesos/blob/c39ef695/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index ecef916..c0dff53 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -114,6 +114,7 @@ using mesos::internal::recordio::Reader; using mesos::master::detector::MasterDetector; using process::collect; +using process::Failure; using process::Owned; using process::wait; // Necessary on some OS's to disambiguate. @@ -263,6 +264,49 @@ public: .onAny(defer(self(), &Self::_send, call, lambda::_1)); } + Future<APIResult> call(const Call& callMessage) + { + Option<Error> error = + validation::scheduler::call::validate(devolve(callMessage)); + + if (error.isSome()) { + return Failure(error->message); + } + + if (callMessage.type() == Call::SUBSCRIBE) { + return Failure("This method doesn't support SUBSCRIBE calls"); + } + + if (state != SUBSCRIBED) { + return Failure( + "Cannot perform calls until subscribed. Current state: " + + stringify(state)); + } + + VLOG(1) << "Sending " << callMessage.type() << " call to " << master.get(); + + // TODO(vinod): Add support for sending MESSAGE calls directly + // to the slave, instead of relaying it through the master, as + // the scheduler driver does. + + process::http::Request request; + request.method = "POST"; + request.url = master.get(); + request.body = serialize(contentType, callMessage); + request.keepAlive = true; + request.headers = {{"Accept", stringify(contentType)}, + {"Content-Type", stringify(contentType)}}; + + // TODO(tillt): Add support for multi-step authentication protocols. + return authenticatee->authenticate(request, credential) + .recover([](const Future<process::http::Request>& future) { + return Failure( + stringify("HTTP authenticatee ") + + (future.isFailed() ? "failed: " + future.failure() : "discarded")); + }) + .then(defer(self(), &Self::_call, callMessage, lambda::_1)); + } + void reconnect() { // Ignore the reconnection request if we are currently disconnected @@ -675,6 +719,68 @@ protected: response->body + ") for " + stringify(call.type())); } + Future<APIResult> _call( + const Call& callMessage, + process::http::Request request) + { + if (connections.isNone()) { + return Failure("Connection to master interrupted"); + } + + Future<process::http::Response> response; + + CHECK_SOME(streamId); + + // Set the stream ID associated with this connection. + request.headers["Mesos-Stream-Id"] = streamId->toString(); + + CHECK_SOME(connectionId); + + return connections->nonSubscribe.send(request) + .then(defer(self(), + &Self::__call, + callMessage, + lambda::_1)); + } + + Future<APIResult> __call( + const Call& callMessage, + const process::http::Response& response) + { + APIResult result; + + result.set_status_code(response.code); + + if (response.code == process::http::Status::ACCEPTED) { + // "202 Accepted" responses are asynchronously processed, so the body + // should be empty. + if (!response.body.empty()) { + LOG(WARNING) << "Response for " << callMessage.type() + << " unexpectedly included body: '" << response.body + << "'"; + } + } else if (response.code == process::http::Status::OK) { + if (!response.body.empty()) { + Try<Response> deserializedResponse = + deserialize<Response>(contentType, response.body); + + if (deserializedResponse.isError()) { + return Failure( + "Failed to deserialize the response '" + response.status + "'" + + " (" + response.body + "): " + deserializedResponse.error()); + } + + *result.mutable_response() = deserializedResponse.get(); + } + } else { + result.set_error( + "Received unexpected '" + response.status + "'" + " (" + + response.body + ")"); + } + + return result; + } + void read() { subscribed->decoder->read() @@ -917,6 +1023,11 @@ void Mesos::send(const Call& call) dispatch(process, &MesosProcess::send, call); } +Future<APIResult> Mesos::call(const Call& callMessage) +{ + return dispatch(process, &MesosProcess::call, callMessage); +} + void Mesos::reconnect() {
