This is an automated email from the ASF dual-hosted git repository. bennoe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 035c4f60ea770dad00b6655597dbdb35b9cb6c09 Author: Benjamin Bannier <[email protected]> AuthorDate: Fri Nov 29 11:55:44 2019 +0100 Handled `/api/v1` and /api/v1/executor` over agent executor socket. This patch wires up the agent executor socket to handle calls for `/api/v1` and `/api/v1/executor` so executors can use domain sockets to communicate with the agent; executors use the latter to subscribe with the agent and the former to e.g., launch containers. Note that with this patch we now expose the full operator API over the agent's domain socket. Review: https://reviews.apache.org/r/71854 --- src/slave/slave.cpp | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/slave/slave.hpp | 1 + 2 files changed, 64 insertions(+) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 2607b0b..0005971 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -162,6 +162,7 @@ using process::Failure; using process::Future; using process::Owned; using process::PID; +using process::Promise; using process::Time; using process::UPID; @@ -775,6 +776,68 @@ void Slave::initialize() }, options); + if (executorSocket.isSome()) { + // We use `http::Server` to manage the communication channel. + // Since `http::Server` currently doesn't offer support for + // authentication we then inject the request received by the + // server into normal agent rounting logic. + Try<http::Server> server = http::Server::create( + *executorSocket, + process::defer( + self(), + [this](const process::network::Socket&, http::Request request) + -> Future<http::Response> { + // Restrict access to only allow `/slave(N)/api/v1/executor` + // and `/slave(N)/api/v1`. Executors need to be able to + // access the first to subscribe and the latter to e.g., + // launch containers or perform other operator API calls. + string selfPrefix = "/" + self().id; + if (request.url.path != selfPrefix + "/api/v1/executor" && + request.url.path != selfPrefix + "/api/v1") { + LOG(INFO) + << "Blocking request for " << request.url.path + << " over executor socket"; + return http::Forbidden(); + } + + // Create an `HttpEvent` with the needed information which we can + // be consumed by the agent. The event contains e.g., the + // requested path so the expected route `/api/v1/executor` is + // routed when consuming the event. + std::unique_ptr<Promise<http::Response>> promise( + new Promise<http::Response>()); + + Future<http::Response> response = promise->future(); + + process::HttpEvent event( + std::unique_ptr<http::Request>(new http::Request(request)), + std::move(promise)); + + std::move(event).consume(this); + + return response; + }), + { + /* .scheme =*/process::http::Scheme::HTTP_UNIX, + /* .backlog =*/16384, + }); + + if (server.isError()) { + LOG(FATAL) << "Could not start listening on executor socket: " + << server.error(); + } else { + executorSocketServer = std::move(*server); + + Future<Nothing> executorSocketServerTerminated = + executorSocketServer->run(); + + if (executorSocketServerTerminated.isFailed()) { + LOG(FATAL) << "Could not start listening on executor socket: " + << executorSocketServerTerminated.failure(); + } + } + } + route("/api/v1/executor", EXECUTOR_HTTP_AUTHENTICATION_REALM, Http::EXECUTOR_HELP(), diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 0f3f502..3d191dc 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -873,6 +873,7 @@ private: PendingFutureTracker* futureTracker; Option<process::network::unix::Socket> executorSocket; + Option<process::http::Server> executorSocketServer; const Option<Authorizer*> authorizer;
