Introduced executor HTTP endpoint on agent. This change introduces a stub endpoint on agent. As of now, it doesn't do much except validating the `Content-Type`, `Accept` headers among other trivial validations. Most of the functionality already existed in `src/master/http.cpp`.
Review: https://reviews.apache.org/r/38497 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50805083 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50805083 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50805083 Branch: refs/heads/master Commit: 50805083d4045126fc44ff2d027428a04fc2e636 Parents: 067df4b Author: Anand Mazumdar <[email protected]> Authored: Mon Sep 21 15:01:50 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Mon Sep 21 15:05:06 2015 -0700 ---------------------------------------------------------------------- src/slave/http.cpp | 137 +++++++++++++++++++++++++++++++++++++++++++++++ src/slave/slave.cpp | 7 +++ src/slave/slave.hpp | 5 ++ 3 files changed, 149 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/http.cpp ---------------------------------------------------------------------- diff --git a/src/slave/http.cpp b/src/slave/http.cpp index 101aa06..12a4d39 100644 --- a/src/slave/http.cpp +++ b/src/slave/http.cpp @@ -22,6 +22,10 @@ #include <string> #include <vector> +#include <mesos/executor/executor.hpp> + +#include <mesos/v1/executor/executor.hpp> + #include <mesos/type_utils.hpp> #include <process/help.hpp> @@ -41,6 +45,8 @@ #include "common/build.hpp" #include "common/http.hpp" +#include "internal/devolve.hpp" + #include "mesos/mesos.hpp" #include "mesos/resources.hpp" @@ -55,8 +61,17 @@ using process::Owned; using process::TLDR; using process::USAGE; +using process::http::Accepted; +using process::http::BadRequest; +using process::http::Forbidden; using process::http::InternalServerError; +using process::http::MethodNotAllowed; +using process::http::NotAcceptable; +using process::http::NotImplemented; using process::http::OK; +using process::http::Pipe; +using process::http::ServiceUnavailable; +using process::http::UnsupportedMediaType; using process::metrics::internal::MetricsProcess; @@ -182,6 +197,128 @@ void Slave::Http::log(const Request& request) } +const string Slave::Http::EXECUTOR_HELP = HELP( + TLDR( + "Endpoint for the Executor HTTP API."), + DESCRIPTION( + "This endpoint is used by the executors to interact with the ", + "agent via Call/Event messages." + "Returns 200 OK iff the initial SUBSCRIBE Call is successful." + "This would result in a streaming response via chunked " + "transfer encoding. The executors can process the response " + "incrementally." + "Returns 202 Accepted for all other Call messages iff the " + "request is accepted.")); + + +Future<Response> Slave::Http::executor(const Request& request) const +{ + // TODO(anand): Add metrics for rejected requests. + + if (slave->state == Slave::RECOVERING) { + return ServiceUnavailable("Agent has not finished recovery"); + } + + if (request.method != "POST") { + return MethodNotAllowed( + "Expecting a 'POST' request, received '" + request.method + "'"); + } + + v1::executor::Call v1Call; + + Option<string> contentType = request.headers.get("Content-Type"); + if (contentType.isNone()) { + return BadRequest("Expecting 'Content-Type' to be present"); + } + + if (contentType.get() == APPLICATION_PROTOBUF) { + if (!v1Call.ParseFromString(request.body)) { + return BadRequest("Failed to parse body into Call protobuf"); + } + } else if (contentType.get() == APPLICATION_JSON) { + Try<JSON::Value> value = JSON::parse(request.body); + if (value.isError()) { + return BadRequest("Failed to parse body into JSON: " + value.error()); + } + + Try<v1::executor::Call> parse = + ::protobuf::parse<v1::executor::Call>(value.get()); + + if (parse.isError()) { + return BadRequest("Failed to convert JSON into Call protobuf: " + + parse.error()); + } + + v1Call = parse.get(); + } else { + return UnsupportedMediaType( + string("Expecting 'Content-Type' of ") + + APPLICATION_JSON + " or " + APPLICATION_PROTOBUF); + } + + const executor::Call call = devolve(v1Call); + + // TODO(anand): Validate the protobuf (MESOS-2906) before proceeding + // further. + + if (call.type() == executor::Call::SUBSCRIBE) { + // We default to JSON since an empty 'Accept' header + // results in all media types considered acceptable. + ContentType responseContentType; + + if (request.acceptsMediaType(APPLICATION_JSON)) { + responseContentType = ContentType::JSON; + } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { + responseContentType = ContentType::PROTOBUF; + } else { + return NotAcceptable( + string("Expecting 'Accept' to allow ") + + "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); + } + + Pipe pipe; + OK ok; + ok.headers["Content-Type"] = stringify(responseContentType); + + ok.type = Response::PIPE; + ok.reader = pipe.reader(); + + return ok; + } + + + // We consolidate the framework/executor lookup logic here because + // it is common for all the call handlers. + Framework* framework = slave->getFramework(call.framework_id()); + if (framework == NULL) { + return BadRequest("Framework cannot be found"); + } + + Executor* executor = framework->getExecutor(call.executor_id()); + if (executor == NULL) { + return BadRequest("Executor cannot be found"); + } + + if (executor->state == Executor::REGISTERING) { + return Forbidden("Executor is not subscribed"); + } + + switch (call.type()) { + case executor::Call::UPDATE: + return Accepted(); + + case executor::Call::MESSAGE: + return Accepted(); + + default: + // Should be caught during call validation above. + LOG(FATAL) << "Unexpected " << call.type() << " call"; + } + + return NotImplemented(); +} + + const string Slave::Http::HEALTH_HELP = HELP( TLDR( "Health check of the Slave."), http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index ad710d7..29865ec 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -494,6 +494,13 @@ void Slave::initialize() // Setup HTTP routes. Http http = Http(this); + route("/api/v1/executor", + Http::EXECUTOR_HELP, + [http](const process::http::Request& request) { + Http::log(request); + return http.executor(request); + }); + // TODO(ijimenez): Remove this endpoint at the end of the // deprecation cycle on 0.26. route("/state.json", http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 32e1830..7a54fad 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -399,6 +399,10 @@ private: // desired request handler to get consistent request logging. static void log(const process::http::Request& request); + // /slave/api/v1/executor + process::Future<process::http::Response> executor( + const process::http::Request& request) const; + // /slave/health process::Future<process::http::Response> health( const process::http::Request& request) const; @@ -407,6 +411,7 @@ private: process::Future<process::http::Response> state( const process::http::Request& request) const; + static const std::string EXECUTOR_HELP; static const std::string HEALTH_HELP; static const std::string STATE_HELP;
