Repository: mesos
Updated Branches:
  refs/heads/master 52b9c0bec -> 0bdd72ad7


Revert "Revert "Updated scheduler library to HTTP.""

This reverts commit ddbd429708f17caca593b4ba52cb10661066a39e.

Review: https://reviews.apache.org/r/37465


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/de8399ae
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/de8399ae
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/de8399ae

Branch: refs/heads/master
Commit: de8399aeb4d44e518619fd38b41eb20813a1cd5e
Parents: cbd8b6e
Author: Vinod Kone <[email protected]>
Authored: Thu Aug 13 17:15:37 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Thu Aug 13 23:09:40 2015 -0700

----------------------------------------------------------------------
 src/common/http.hpp         |  30 ++++++
 src/scheduler/scheduler.cpp | 203 ++++++++++++++++++++++++++++++++++++++-
 2 files changed, 228 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/de8399ae/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 6f4f686..0eab8ba 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -25,6 +25,7 @@
 
 #include <stout/hashmap.hpp>
 #include <stout/json.hpp>
+#include <stout/protobuf.hpp>
 
 namespace mesos {
 
@@ -69,6 +70,35 @@ std::string serialize(
     const google::protobuf::Message& message);
 
 
+// Deserializes a string message into a protobuf message based on the
+// HTTP content type.
+template <typename Message>
+Try<Message> deserialize(
+    ContentType contentType,
+    const std::string& body)
+{
+  switch (contentType) {
+    case ContentType::PROTOBUF: {
+      Message message;
+      if (!message.ParseFromString(body)) {
+        return Error("Failed to parse body into a protobuf object");
+      }
+      return message;
+    }
+    case ContentType::JSON: {
+      Try<JSON::Value> value = JSON::parse(body);
+      if (value.isError()) {
+        return Error("Failed to parse body into JSON: " + value.error());
+      }
+
+      return ::protobuf::parse<Message>(value.get());
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 JSON::Object model(const Resources& resources);
 JSON::Object model(const hashmap<std::string, Resources>& roleResources);
 JSON::Object model(const Attributes& attributes);

http://git-wip-us.apache.org/repos/asf/mesos/blob/de8399ae/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 36d7052..37b5457 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -26,6 +26,7 @@
 #include <arpa/inet.h>
 
 #include <iostream>
+#include <queue>
 #include <string>
 #include <sstream>
 
@@ -47,13 +48,18 @@
 #include <stout/duration.hpp>
 #include <stout/error.hpp>
 #include <stout/flags.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/ip.hpp>
 #include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/recordio.hpp>
 #include <stout/uuid.hpp>
 
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
@@ -77,8 +83,17 @@ using std::queue;
 using std::string;
 using std::vector;
 
+using mesos::internal::recordio::Reader;
+
+using process::Owned;
 using process::wait; // Necessary on some OS's to disambiguate.
 
+using process::http::Pipe;
+using process::http::post;
+using process::http::Response;
+
+using ::recordio::Decoder;
+
 namespace mesos {
 namespace v1 {
 namespace scheduler {
@@ -91,10 +106,12 @@ class MesosProcess : public ProtobufProcess<MesosProcess>
 public:
   MesosProcess(
       const string& master,
+      ContentType _contentType,
       const lambda::function<void(void)>& _connected,
       const lambda::function<void(void)>& _disconnected,
       lambda::function<void(const queue<Event>&)> _received)
     : ProcessBase(ID::generate("scheduler")),
+      contentType(_contentType),
       connected(_connected),
       disconnected(_disconnected),
       received(_received),
@@ -160,6 +177,8 @@ public:
 
   virtual ~MesosProcess()
   {
+    disconnect();
+
     // Check and see if we need to shutdown a local cluster.
     if (local) {
       local::shutdown();
@@ -172,7 +191,7 @@ public:
   // TODO(benh): Move this to 'protected'.
   using ProtobufProcess<MesosProcess>::send;
 
-  void send(Call call)
+  void send(const Call& call)
   {
     if (master.isNone()) {
       drop(call, "Disconnected");
@@ -189,7 +208,34 @@ public:
     // TODO(vinod): Add support for sending MESSAGE calls directly
     // to the slave, instead of relaying it through the master, as
     // the scheduler driver does.
-    send(master.get(), devolve(call));
+
+    const string body = serialize(contentType, call);
+    const hashmap<string, string> headers{{"Accept", stringify(contentType)}};
+
+    Future<Response> response;
+
+    if (call.type() == Call::SUBSCRIBE) {
+      // Each subscription requires a new connection.
+      disconnect();
+
+      // Send a streaming request for Subscribe call.
+      response = process::http::streaming::post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    } else {
+      response = post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    }
+
+    response
+      .onAny(defer(self(), &Self::_send, call, lambda::_1));
   }
 
 protected:
@@ -209,6 +255,9 @@ protected:
       return;
     }
 
+    // Disconnect the reader upon a master detection callback.
+    disconnect();
+
     if (future.get().isNone()) {
       master = None();
 
@@ -260,7 +309,7 @@ protected:
 
     error->set_message(message);
 
-    receive(None(), event);
+    receive(event, true);
   }
 
   void drop(const Call& call, const string& message)
@@ -268,7 +317,145 @@ protected:
     LOG(WARNING) << "Dropping " << call.type() << ": " << message;
   }
 
+  void _send(const Call& call, const Future<Response>& response)
+  {
+    CHECK(!response.isDiscarded());
+
+    // This can happen during a master failover or a network blip
+    // causing the socket to timeout. Eventually, the scheduler would
+    // detect the disconnection via ZK(disconnect()) or lack of heartbeats.
+    if (response.isFailed()) {
+      LOG(ERROR) << "Request for call type " << call.type() << " failed: "
+                 << response.failure();
+      return;
+    }
+
+    if (call.type() == Call::SUBSCRIBE &&
+        response.get().status == process::http::statuses[200] ) {
+      CHECK_EQ(response.get().type, http::Response::PIPE);
+      CHECK_SOME(response.get().reader);
+
+      Pipe::Reader reader = response.get().reader.get();
+
+      auto deserializer =
+        lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+      Owned<Reader<Event>> decoder(
+          new Reader<Event>(Decoder<Event>(deserializer), reader));
+
+      connection = Connection {reader, decoder};
+
+      read();
+      return;
+    }
+
+    if (call.type() != Call::SUBSCRIBE &&
+        response.get().status == process::http::statuses[202] ) {
+      return;
+    }
+
+    // We should be able to get here only for AuthN errors which is not
+    // yet supported for HTTP frameworks. The other possible scenario
+    // can be that the master was not able to de-serialize the Call
+    // message. Since we validate the Call messages before sending, this
+    // can only happen when a packet corruption happens.
+    error("Received unexpected '" + response.get().status + "' for " +
+          stringify(call.type()) + " call: " + response.get().body);
+  }
+
+  void read()
+  {
+    connection.get().decoder->read()
+      .onAny(defer(self(),
+                   &Self::_read,
+                   connection.get().reader,
+                   lambda::_1));
+  }
+
+  void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event)
+  {
+    CHECK(!event.isDiscarded());
+
+    // Ignore enqueued events from the previous Subscribe call reader.
+    if (!connection.isSome() || connection.get().reader != reader) {
+      VLOG(1) << "Ignoring event from old stale connection";
+      return;
+    }
+
+    // This could happen if the master failed over while sending a response.
+    // It's fine to drop this as the scheduler would detect the
+    // disconnection via ZK(disconnect) or lack of heartbeats.
+    if (event.isFailed()) {
+      LOG(ERROR) << "Failed to decode the stream of events: "
+                 << event.failure();
+      return;
+    }
+
+    if (!event.get().isSome()) {
+      // It's fine to drop this as the scheduler would detect the
+      // disconnection via ZK(disconnect) or lack of heartbeats.
+      LOG(ERROR) << "End-Of-File received from master."
+                 << " The master closed the event stream";
+      return;
+    }
+
+    if (event.get().isError()) {
+      error("Failed to de-serialize event: " + event.get().error());
+    } else {
+      receive(event.get().get(), false);
+    }
+
+    read();
+  }
+
+  void receive(const Event& event, bool isLocallyInjected)
+  {
+    // Check if we're disconnected but received an event.
+    if (!isLocallyInjected && master.isNone()) {
+      LOG(WARNING) << "Ignoring " << stringify(event.type())
+                   << " event because we're disconnected";
+      return;
+    }
+
+    if (isLocallyInjected) {
+      VLOG(1) << "Enqueuing locally injected event "
+              << stringify(event.type());
+    }
+
+    // Queue up the event and invoke the 'received' callback if this
+    // is the first event (between now and when the 'received'
+    // callback actually gets invoked more events might get queued).
+    events.push(event);
+
+    if (events.size() == 1) {
+      mutex.lock()
+        .then(defer(self(), &Self::_receive))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    }
+  }
+
+  void disconnect()
+  {
+    if (connection.isSome()) {
+      if (!connection.get().reader.close()) {
+        LOG(WARNING) << "HTTP connection was already closed";
+      }
+    }
+
+    connection = None();
+  }
+
 private:
+  struct Connection
+  {
+    Pipe::Reader reader;
+    process::Owned<Reader<Event>> decoder;
+  };
+
+  Option<Connection> connection;
+
+  ContentType contentType;
+
   Mutex mutex; // Used to serialize the callback invocations.
 
   lambda::function<void(void)> connected;
@@ -291,8 +478,14 @@ Mesos::Mesos(
     const lambda::function<void(void)>& disconnected,
     const lambda::function<void(const queue<Event>&)>& received)
 {
-  process =
-    new MesosProcess(master, connected, disconnected, received);
+  // TODO(anand): Make ContentType as a constructor argument.
+  process = new MesosProcess(
+      master,
+      ContentType::PROTOBUF,
+      connected,
+      disconnected,
+      received);
+
   spawn(process);
 }
 

Reply via email to