Added HttpConnection to support http frameworks in the master.

Review: https://reviews.apache.org/r/36720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7f352ef8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7f352ef8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7f352ef8

Branch: refs/heads/master
Commit: 7f352ef886f3116e4bef23b235d87b3182354908
Parents: d44419a
Author: Anand Mazumdar <[email protected]>
Authored: Thu Aug 6 10:37:21 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Thu Aug 6 11:25:17 2015 -0700

----------------------------------------------------------------------
 src/common/http.cpp   | 19 ++++++++++
 src/common/http.hpp   |  8 +++++
 src/master/master.cpp | 51 ++++++++++++++------------
 src/master/master.hpp | 89 +++++++++++++++++++++++++---------------------
 4 files changed, 105 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/common/http.cpp
----------------------------------------------------------------------
diff --git a/src/common/http.cpp b/src/common/http.cpp
index a74c51d..e2ff48c 100644
--- a/src/common/http.cpp
+++ b/src/common/http.cpp
@@ -27,6 +27,7 @@
 #include <stout/foreach.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/stringify.hpp>
+#include <stout/unreachable.hpp>
 
 #include "common/attributes.hpp"
 #include "common/http.hpp"
@@ -45,6 +46,24 @@ const char APPLICATION_JSON[] = "application/json";
 const char APPLICATION_PROTOBUF[] = "application/x-protobuf";
 
 
+string serialize(
+    ContentType contentType,
+    const google::protobuf::Message& message)
+{
+  switch (contentType) {
+    case ContentType::PROTOBUF: {
+      return message.SerializeAsString();
+    }
+    case ContentType::JSON: {
+      JSON::Object object = JSON::Protobuf(message);
+      return stringify(object);
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 // TODO(bmahler): Kill these in favor of automatic Proto->JSON
 // Conversion (when it becomes available).
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 9e4290f..98a1270 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -46,6 +46,14 @@ enum class ContentType
   JSON
 };
 
+
+// Serializes a protobuf message for transmission
+// based on the HTTP content type.
+std::string serialize(
+    ContentType contentType,
+    const google::protobuf::Message& message);
+
+
 JSON::Object model(const Resources& resources);
 JSON::Object model(const hashmap<std::string, Resources>& roleResources);
 JSON::Object model(const Attributes& attributes);

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 50b9824..d699e4b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -960,10 +960,11 @@ void Master::finalize()
 }
 
 
-void Master::exited(const FrameworkID& frameworkId, Pipe::Writer writer)
+void Master::exited(const FrameworkID& frameworkId, const HttpConnection& http)
 {
   foreachvalue (Framework* framework, frameworks.registered) {
-    if (framework->http.isSome() && framework->http.get().writer == writer) {
+    if (framework->http.isSome() &&
+        framework->http.get().writer == http.writer) {
       CHECK_EQ(frameworkId, framework->id());
       _exited(framework);
       return;
@@ -4665,15 +4666,18 @@ void Master::addFramework(Framework* framework)
   CHECK(!frameworks.registered.contains(framework->id()))
     << "Framework " << *framework << " already exists!";
 
-  CHECK_SOME(framework->pid) << "adding http framework not implemented";
-
   frameworks.registered[framework->id()] = framework;
 
-  link(framework->pid.get());
+  if (framework->pid.isSome()) {
+    link(framework->pid.get());
+  } else {
+    CHECK_SOME(framework->http);
+
+    HttpConnection http = framework->http.get();
 
-  // TODO(anand): For http frameworks, add a readerClosed()
-  // callback to invoke Master::exited() when the connection
-  // closes.
+    http.closed()
+      .onAny(defer(self(), &Self::exited, framework->id(), http));
+  }
 
   // Enforced by Master::registerFramework.
   CHECK(roles.contains(framework->info.role()))
@@ -4695,22 +4699,25 @@ void Master::addFramework(Framework* framework)
   // If the framework is authenticated, its principal should be in
   // 'authenticated'. Otherwise look if it's supplied in
   // FrameworkInfo.
-  Option<string> principal = authenticated.get(framework->pid.get());
-  if (principal.isNone() && framework->info.has_principal()) {
-    principal = framework->info.principal();
-  }
+  if (framework->pid.isSome()) {
+    Option<string> principal = authenticated.get(framework->pid.get());
+    if (principal.isNone() && framework->info.has_principal()) {
+      principal = framework->info.principal();
+    }
 
-  CHECK(!frameworks.principals.contains(framework->pid.get()));
-  frameworks.principals.put(framework->pid.get(), principal);
+    CHECK(!frameworks.principals.contains(framework->pid.get()));
+    frameworks.principals.put(framework->pid.get(), principal);
 
-  // Export framework metrics if a principal is specified.
-  if (principal.isSome()) {
-    // Create new framework metrics if this framework is the first
-    // one of this principal. Otherwise existing metrics are reused.
-    if (!metrics->frameworks.contains(principal.get())) {
-      metrics->frameworks.put(
-          principal.get(),
-          Owned<Metrics::Frameworks>(new 
Metrics::Frameworks(principal.get())));
+    // Export framework metrics if a principal is specified.
+    if (principal.isSome()) {
+      // Create new framework metrics if this framework is the first
+      // one of this principal. Otherwise existing metrics are reused.
+      if (!metrics->frameworks.contains(principal.get())) {
+        metrics->frameworks.put(
+            principal.get(),
+            Owned<Metrics::Frameworks>(
+              new Metrics::Frameworks(principal.get())));
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7f352ef8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 30a2550..53420ca 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -92,6 +92,7 @@ class SlaveObserver;
 
 struct BoundedRateLimiter;
 struct Framework;
+struct HttpConnection;
 struct Role;
 
 
@@ -512,9 +513,7 @@ protected:
   virtual void visit(const process::ExitedEvent& event);
 
   virtual void exited(const process::UPID& pid);
-  void exited(
-      const FrameworkID& frameworkId,
-      process::http::Pipe::Writer writer);
+  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
   void _exited(Framework* framework);
 
   // Invoked when the message is ready to be executed after
@@ -1225,6 +1224,37 @@ inline std::ostream& operator << (
     const Framework& framework);
 
 
+// Represents the streaming HTTP connection to a framework.
+struct HttpConnection
+{
+  HttpConnection(const process::http::Pipe::Writer& _writer,
+       ContentType _contentType)
+    :  writer(_writer),
+       contentType(_contentType),
+       encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+
+  // Converts the message to an Event before sending.
+  template <typename Message>
+  bool send(const Message& message) {
+    return writer.write(encoder.encode(protobuf::scheduler::event(message)));
+  }
+
+  bool close()
+  {
+    return writer.close();
+  }
+
+  process::Future<Nothing> closed() const
+  {
+    return writer.readerClosed();
+  }
+
+  process::http::Pipe::Writer writer;
+  ContentType contentType;
+  recordio::Encoder<scheduler::Event> encoder;
+};
+
+
 // Information about a connected or completed framework.
 // TODO(bmahler): Keeping the task and executor information in sync
 // across the Slave and Framework structs is error prone!
@@ -1245,46 +1275,21 @@ struct Framework
 
   Framework(Master* const _master,
             const FrameworkInfo& _info,
-            const process::http::Pipe::Writer& writer,
-            ContentType contentType,
+            const HttpConnection& _http,
             const process::Time& time = process::Clock::now())
     : master(_master),
       info(_info),
+      http(_http),
       connected(true),
       active(true),
       registeredTime(time),
       reregisteredTime(time),
-      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK)
-  {
-    // TODO(anand): This logic needs to be invoked each
-    // time the framework connects via http. Move it to
-    // a method instead, that gets invoked from
-    // addFramework and failoverFrameowrk.
-
-    auto serialize = [contentType](const scheduler::Event& event) {
-      switch (contentType) {
-        case ContentType::PROTOBUF: {
-          return event.SerializeAsString();
-        }
-        case ContentType::JSON: {
-          JSON::Object object = JSON::Protobuf(event);
-          return stringify(object);
-        }
-      }
-    };
-
-    auto encoder = recordio::Encoder<scheduler::Event>(serialize);
-
-    http = Http {writer, encoder};
-
-    http.get().writer.readerClosed().
-      onAny(defer(master->self(), &Master::exited, id(), writer));
-  }
+      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
 
   ~Framework()
   {
     if (http.isSome() && connected) {
-      if (!http.get().writer.close()) {
+      if (!http.get().close()) {
         LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
       }
     }
@@ -1343,7 +1348,7 @@ struct Framework
     if (http.isSome()) {
       const scheduler::Event event = protobuf::scheduler::event(message);
 
-      if (!http.get().writer.write(http.get().encoder.encode(event))) {
+      if (!http.get().send(message)) {
         LOG(WARNING) << "Unable to send event to framework " << *this << ":"
                      << " connection closed";
       }
@@ -1503,21 +1508,25 @@ struct Framework
     }
   }
 
+  void updateConnection(const HttpConnection& other)
+  {
+    // Close the existing connection if it has changed.
+    if (http.isSome() && http.get().writer != other.writer) {
+      http.get().close();
+    }
+
+    http = other;
+  }
+
   Master* const master;
 
   FrameworkInfo info;
 
-  struct Http
-  {
-    process::http::Pipe::Writer writer;
-    recordio::Encoder<scheduler::Event> encoder;
-  };
-
   // Frameworks can either be connected via HTTP or by message
   // passing (scheduler driver). Exactly one of 'http' and 'pid'
   // will be set according to the last connection made by the
   // framework.
-  Option<Http> http;
+  Option<HttpConnection> http;
   Option<process::UPID> pid;
 
   // Framework becomes disconnected when the socket closes.

Reply via email to