Introduced executor HTTP endpoint on agent.

This change introduces a stub endpoint on agent. As of now, it doesn't
do much except validating the `Content-Type`, `Accept` headers among
other trivial validations. Most of the functionality already existed in
`src/master/http.cpp`.

Review: https://reviews.apache.org/r/38497


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50805083
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50805083
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50805083

Branch: refs/heads/master
Commit: 50805083d4045126fc44ff2d027428a04fc2e636
Parents: 067df4b
Author: Anand Mazumdar <[email protected]>
Authored: Mon Sep 21 15:01:50 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Mon Sep 21 15:05:06 2015 -0700

----------------------------------------------------------------------
 src/slave/http.cpp  | 137 +++++++++++++++++++++++++++++++++++++++++++++++
 src/slave/slave.cpp |   7 +++
 src/slave/slave.hpp |   5 ++
 3 files changed, 149 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 101aa06..12a4d39 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -22,6 +22,10 @@
 #include <string>
 #include <vector>
 
+#include <mesos/executor/executor.hpp>
+
+#include <mesos/v1/executor/executor.hpp>
+
 #include <mesos/type_utils.hpp>
 
 #include <process/help.hpp>
@@ -41,6 +45,8 @@
 #include "common/build.hpp"
 #include "common/http.hpp"
 
+#include "internal/devolve.hpp"
+
 #include "mesos/mesos.hpp"
 #include "mesos/resources.hpp"
 
@@ -55,8 +61,17 @@ using process::Owned;
 using process::TLDR;
 using process::USAGE;
 
+using process::http::Accepted;
+using process::http::BadRequest;
+using process::http::Forbidden;
 using process::http::InternalServerError;
+using process::http::MethodNotAllowed;
+using process::http::NotAcceptable;
+using process::http::NotImplemented;
 using process::http::OK;
+using process::http::Pipe;
+using process::http::ServiceUnavailable;
+using process::http::UnsupportedMediaType;
 
 using process::metrics::internal::MetricsProcess;
 
@@ -182,6 +197,128 @@ void Slave::Http::log(const Request& request)
 }
 
 
+const string Slave::Http::EXECUTOR_HELP = HELP(
+    TLDR(
+        "Endpoint for the Executor HTTP API."),
+    DESCRIPTION(
+        "This endpoint is used by the executors to interact with the ",
+        "agent via Call/Event messages."
+        "Returns 200 OK iff the initial SUBSCRIBE Call is successful."
+        "This would result in a streaming response via chunked "
+        "transfer encoding. The executors can process the response "
+        "incrementally."
+        "Returns 202 Accepted for all other Call messages iff the "
+        "request is accepted."));
+
+
+Future<Response> Slave::Http::executor(const Request& request) const
+{
+  // TODO(anand): Add metrics for rejected requests.
+
+  if (slave->state == Slave::RECOVERING) {
+    return ServiceUnavailable("Agent has not finished recovery");
+  }
+
+  if (request.method != "POST") {
+    return MethodNotAllowed(
+        "Expecting a 'POST' request, received '" + request.method + "'");
+  }
+
+  v1::executor::Call v1Call;
+
+  Option<string> contentType = request.headers.get("Content-Type");
+  if (contentType.isNone()) {
+    return BadRequest("Expecting 'Content-Type' to be present");
+  }
+
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!v1Call.ParseFromString(request.body)) {
+      return BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+    if (value.isError()) {
+      return BadRequest("Failed to parse body into JSON: " + value.error());
+    }
+
+    Try<v1::executor::Call> parse =
+      ::protobuf::parse<v1::executor::Call>(value.get());
+
+    if (parse.isError()) {
+      return BadRequest("Failed to convert JSON into Call protobuf: " +
+                        parse.error());
+    }
+
+    v1Call = parse.get();
+  } else {
+    return UnsupportedMediaType(
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+  }
+
+  const executor::Call call = devolve(v1Call);
+
+  // TODO(anand): Validate the protobuf (MESOS-2906) before proceeding
+  // further.
+
+  if (call.type() == executor::Call::SUBSCRIBE) {
+    // We default to JSON since an empty 'Accept' header
+    // results in all media types considered acceptable.
+    ContentType responseContentType;
+
+    if (request.acceptsMediaType(APPLICATION_JSON)) {
+      responseContentType = ContentType::JSON;
+    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+      responseContentType = ContentType::PROTOBUF;
+    } else {
+      return NotAcceptable(
+          string("Expecting 'Accept' to allow ") +
+          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+    }
+
+    Pipe pipe;
+    OK ok;
+    ok.headers["Content-Type"] = stringify(responseContentType);
+
+    ok.type = Response::PIPE;
+    ok.reader = pipe.reader();
+
+    return ok;
+  }
+
+
+  // We consolidate the framework/executor lookup logic here because
+  // it is common for all the call handlers.
+  Framework* framework = slave->getFramework(call.framework_id());
+  if (framework == NULL) {
+    return BadRequest("Framework cannot be found");
+  }
+
+  Executor* executor = framework->getExecutor(call.executor_id());
+  if (executor == NULL) {
+    return BadRequest("Executor cannot be found");
+  }
+
+  if (executor->state == Executor::REGISTERING) {
+    return Forbidden("Executor is not subscribed");
+  }
+
+  switch (call.type()) {
+    case executor::Call::UPDATE:
+      return Accepted();
+
+    case executor::Call::MESSAGE:
+      return Accepted();
+
+    default:
+      // Should be caught during call validation above.
+      LOG(FATAL) << "Unexpected " << call.type() << " call";
+  }
+
+  return NotImplemented();
+}
+
+
 const string Slave::Http::HEALTH_HELP = HELP(
     TLDR(
         "Health check of the Slave."),

http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index ad710d7..29865ec 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -494,6 +494,13 @@ void Slave::initialize()
   // Setup HTTP routes.
   Http http = Http(this);
 
+  route("/api/v1/executor",
+        Http::EXECUTOR_HELP,
+        [http](const process::http::Request& request) {
+          Http::log(request);
+          return http.executor(request);
+        });
+
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
   route("/state.json",

http://git-wip-us.apache.org/repos/asf/mesos/blob/50805083/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 32e1830..7a54fad 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -399,6 +399,10 @@ private:
     // desired request handler to get consistent request logging.
     static void log(const process::http::Request& request);
 
+    // /slave/api/v1/executor
+    process::Future<process::http::Response> executor(
+        const process::http::Request& request) const;
+
     // /slave/health
     process::Future<process::http::Response> health(
         const process::http::Request& request) const;
@@ -407,6 +411,7 @@ private:
     process::Future<process::http::Response> state(
         const process::http::Request& request) const;
 
+    static const std::string EXECUTOR_HELP;
     static const std::string HEALTH_HELP;
     static const std::string STATE_HELP;
 

Reply via email to