Repository: mesos
Updated Branches:
  refs/heads/master ad1d6fca2 -> d24b3ad64


Implemented the SUBSCRIBE call for http schedulers in the master.

Review: https://reviews.apache.org/r/36720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d24b3ad6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d24b3ad6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d24b3ad6

Branch: refs/heads/master
Commit: d24b3ad6496cafec8f8ea4b02aa106a30e5c1d75
Parents: ad1d6fc
Author: Anand Mazumdar <[email protected]>
Authored: Thu Aug 6 22:04:01 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Aug 7 00:13:57 2015 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  21 ++-
 src/master/master.cpp | 336 ++++++++++++++++++++++++++++++++++++++-------
 src/master/master.hpp |  39 +++++-
 3 files changed, 342 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 76e7080..7d7e562 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -70,6 +70,7 @@ using process::http::InternalServerError;
 using process::http::NotFound;
 using process::http::NotImplemented;
 using process::http::OK;
+using process::http::Pipe;
 using process::http::TemporaryRedirect;
 using process::http::Unauthorized;
 using process::http::UnsupportedMediaType;
@@ -375,10 +376,24 @@ Future<Response> Master::Http::call(const Request& 
request) const
     responseContentType = ContentType::PROTOBUF;
   }
 
-  // Silence unused warning for now.
-  (void)responseContentType;
+  switch (call.type()) {
+    case scheduler::Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
 
-  // TODO(anand): Handle the call.
+      ok.type = Response::PIPE;
+      ok.reader = pipe.reader();
+
+      HttpConnection http {pipe.writer(), responseContentType};
+      master->subscribe(http, call.subscribe());
+
+      return ok;
+    }
+    default:
+      // TODO(bmahler): Log fatally here once all calls are
+      // implemented, since validation should catch this.
+      break;
+  }
 
   return NotImplemented();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a7b8527..0330f94 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1772,6 +1772,208 @@ void Master::reregisterFramework(
 
 
 void Master::subscribe(
+    HttpConnection http,
+    const scheduler::Call::Subscribe& subscribe)
+{
+  // TODO(anand): Authenticate the framework.
+
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+  Option<Error> validationError = None();
+
+  // TODO(vinod): Deprecate this in favor of ACLs.
+  if (validationError.isNone() && !roles.contains(frameworkInfo.role())) {
+    validationError = Error("Role '" + frameworkInfo.role() + "' is not" +
+                            " present in the master's --roles");
+  }
+
+  // TODO(vinod): Deprecate this in favor of authorization.
+  if (validationError.isNone() &&
+      frameworkInfo.user() == "root" && !flags.root_submissions) {
+    validationError = Error("User 'root' is not allowed to run frameworks"
+                            " without --root_submissions set");
+  }
+
+  if (validationError.isNone() && frameworkInfo.has_id()) {
+    foreach (const shared_ptr<Framework>& framework, frameworks.completed) {
+      if (framework->id() == frameworkInfo.id()) {
+        // This could happen if a framework tries to subscribe after
+        // its failover timeout has elapsed or it unregistered itself
+        // by calling 'stop()' on the scheduler driver.
+        //
+        // TODO(vinod): Master should persist admitted frameworks to the
+        // registry and remove them from it after failover timeout.
+        validationError = Error("Framework has been removed");
+        break;
+      }
+    }
+  }
+
+  LOG(INFO) << "Received registration request for"
+            << " http framework '" << frameworkInfo.name() << "'";
+
+  if (validationError.isSome()) {
+    LOG(INFO) << "Refusing subscription of framework"
+              << " '" << frameworkInfo.name() << "': "
+              << validationError.get().message;
+
+    FrameworkErrorMessage message;
+    message.set_message(validationError.get().message);
+
+    http.send(message);
+    http.close();
+    return;
+  }
+
+  // TODO(anand): Authorize the framework.
+  this->_subscribe(http, subscribe);
+}
+
+
+void Master::_subscribe(
+    HttpConnection http,
+    const scheduler::Call::Subscribe& subscribe)
+{
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+
+  LOG(INFO) << "Subscribing framework " << frameworkInfo.name()
+            << " with checkpointing "
+            << (frameworkInfo.checkpoint() ? "enabled" : "disabled")
+            << " and capabilities " << frameworkInfo.capabilities();
+
+  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
+    // If we are here the framework is subscribing for the first time.
+    // Assign a new FrameworkID.
+    FrameworkInfo frameworkInfo_ = frameworkInfo;
+    frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
+
+    Framework* framework = new Framework(this, frameworkInfo_, http);
+
+    addFramework(framework);
+
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id());
+    message.mutable_master_info()->MergeFrom(info_);
+    framework->send(message);
+    return;
+  }
+
+  // If we are here framework has already been assigned an id.
+  CHECK(!frameworkInfo.id().value().empty());
+
+  if (frameworks.registered.contains(frameworkInfo.id())) {
+    // Using the "force" field of the scheduler allows us to keep a
+    // scheduler that got partitioned but didn't die (in ZooKeeper
+    // speak this means didn't lose their session) and then
+    // eventually tried to connect to this master even though
+    // another instance of their scheduler has reconnected.
+
+    Framework* framework =
+      CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
+
+    // Update the framework's info fields based on those passed during
+    // re-registration.
+    LOG(INFO) << "Updating info for framework " << framework->id();
+
+    framework->updateFrameworkInfo(frameworkInfo);
+    allocator->updateFramework(framework->id(), framework->info);
+
+    framework->reregisteredTime = Clock::now();
+
+    if (subscribe.force()) {
+      LOG(INFO) << "Framework " << *framework << " failed over";
+      failoverFramework(framework, http);
+    } else if (framework->connected) {
+      // Note that if the scheduler is retrying we expect it
+      // to close its old connection. But, the master may not
+      // realize that the connection is closed before the retry
+      // occurs so we may kick off a scheduler unnecessarily.
+      LOG(ERROR) << "Disallowing subscription attempt"
+                 << " of framework " << *framework
+                 << " because it is already connected";
+
+      FrameworkErrorMessage message;
+      message.set_message("Framework is already connected");
+
+      http.send(message);
+      http.close();
+      return;
+    } else {
+      LOG(INFO) << "Allowing framework " << *framework
+                << " to subcribe with an already used id";
+
+      // Convert the framework to an http framework if it was
+      // pid based in the past.
+      if (framework->pid.isSome()) {
+        framework->pid = None();
+      }
+
+      framework->connected = true;
+      framework->updateConnection(http);
+
+      http.closed()
+        .onAny(defer(self(), &Self::exited, framework->id(), http));
+
+      // Reactivate the framework.
+      if (!framework->active) {
+        framework->active = true;
+        allocator->activateFramework(framework->id());
+      }
+
+      FrameworkReregisteredMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id());
+      message.mutable_master_info()->MergeFrom(info_);
+      framework->send(message);
+    }
+  } else {
+    // We don't have a framework with this ID, so we must be a newly
+    // elected Mesos master to which either an existing scheduler or a
+    // failed-over one is connecting. Create a Framework object and add
+    // any tasks it has that have been reported by reconnecting slaves.
+    Framework* framework = new Framework(this, frameworkInfo, http);
+
+    // Add active tasks and executors to the framework.
+    foreachvalue (Slave* slave, slaves.registered) {
+      foreachvalue (Task* task, slave->tasks[framework->id()]) {
+        framework->addTask(task);
+      }
+      foreachvalue (const ExecutorInfo& executor,
+                    slave->executors[framework->id()]) {
+        framework->addExecutor(slave->id, executor);
+      }
+    }
+
+    // N.B. Need to add the framework _after_ we add its tasks
+    // (above) so that we can properly determine the resources it's
+    // currently using!
+    addFramework(framework);
+
+    FrameworkReregisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id());
+    message.mutable_master_info()->MergeFrom(info_);
+    framework->send(message);
+  }
+
+  CHECK(frameworks.registered.contains(frameworkInfo.id()))
+    << "Unknown framework " << frameworkInfo.id()
+    << " (" << frameworkInfo.name() << ")";
+
+  // Broadcast the new framework pid to all the slaves. We have to
+  // broadcast because an executor might be running on a slave but
+  // it currently isn't running any tasks.
+  foreachvalue (Slave* slave, slaves.registered) {
+    UpdateFrameworkMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
+
+    // TODO(anand): We set 'pid' to UPID() for http frameworks
+    // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+    // no longer have to set pid here for http frameowrks.
+    message.set_pid(UPID());
+    send(slave->pid, message);
+  }
+}
+
+
+void Master::subscribe(
     const UPID& from,
     const scheduler::Call::Subscribe& subscribe)
 {
@@ -1787,8 +1989,12 @@ void Master::subscribe(
               << " framework '" << frameworkInfo.name() << "' at " << from
               << " because authentication is still in progress";
 
+    // Need to disambiguate for the compiler.
+    void (Master::*f)(const UPID&, const scheduler::Call::Subscribe&)
+      = &Self::subscribe;
+
     authenticating[from]
-      .onReady(defer(self(), &Self::subscribe, from, subscribe));
+      .onReady(defer(self(), f, from, subscribe));
     return;
   }
 
@@ -1909,7 +2115,6 @@ void Master::_subscribe(
 
   if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) {
     // If we are here the framework is subscribing for the first time.
-
     // Check if this framework is already subscribed (because it retries).
     foreachvalue (Framework* framework, frameworks.registered) {
       if (framework->pid == from) {
@@ -1942,16 +2147,12 @@ void Master::_subscribe(
   // If we are here framework has already been assigned an id.
   CHECK(!frameworkInfo.id().value().empty());
 
-  if (frameworks.registered.count(frameworkInfo.id()) > 0) {
+  if (frameworks.registered.contains(frameworkInfo.id())) {
     // Using the "force" field of the scheduler allows us to keep a
     // scheduler that got partitioned but didn't die (in ZooKeeper
     // speak this means didn't lose their session) and then
     // eventually tried to connect to this master even though
-    // another instance of their scheduler has reconnected. This
-    // might not be an issue in the future when the
-    // master/allocator launches the scheduler can get restarted
-    // (if necessary) by the master and the master will always
-    // know which scheduler is the correct one.
+    // another instance of their scheduler has reconnected.
 
     Framework* framework =
       CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
@@ -1959,8 +2160,8 @@ void Master::_subscribe(
     // Update the framework's info fields based on those passed during
     // subscription.
     LOG(INFO) << "Updating info for framework " << framework->id();
-    framework->updateFrameworkInfo(frameworkInfo);
 
+    framework->updateFrameworkInfo(frameworkInfo);
     allocator->updateFramework(framework->id(), framework->info);
 
     framework->reregisteredTime = Clock::now();
@@ -1974,9 +2175,9 @@ void Master::_subscribe(
       LOG(INFO) << "Framework " << *framework << " failed over";
       failoverFramework(framework, from);
     } else if (framework->pid != from) {
-      LOG(ERROR)
-        << "Disallowing subscription attempt of framework " << *framework
-        << " because it is not expected from " << from;
+      LOG(ERROR) << "Disallowing subscription attempt of"
+                 << " framework " << *framework
+                 << " because it is not expected from " << from;
       FrameworkErrorMessage message;
       message.set_message("Framework failed over");
       send(from, message);
@@ -1998,6 +2199,7 @@ void Master::_subscribe(
         removeOffer(offer, true); // Rescind.
       }
 
+      // TODO(bmahler): Shouldn't this re-link with the scheduler?
       framework->connected = true;
 
       // Reactivate the framework.
@@ -2053,8 +2255,7 @@ void Master::_subscribe(
 
   // Broadcast the new framework pid to all the slaves. We have to
   // broadcast because an executor might be running on a slave but
-  // it currently isn't running any tasks. This could be a
-  // potential scalability issue ...
+  // it currently isn't running any tasks.
   foreachvalue (Slave* slave, slaves.registered) {
     UpdateFrameworkMessage message;
     message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
@@ -2128,7 +2329,7 @@ void Master::disconnect(Framework* framework)
 
     // Close the HTTP connection, which may already have
     // been closed due to scheduler disconnection.
-    framework->http.get().writer.close();
+    framework->http.get().close();
   }
 
   deactivate(framework);
@@ -4720,13 +4921,48 @@ void Master::addFramework(Framework* framework)
 }
 
 
+void Master::failoverFramework(Framework* framework, const HttpConnection& 
http)
+{
+  // Notify the old connected framework that it has failed over.
+  // Note that this may be a retry in which case we'll shut down
+  // the scheduler unnecessarily.
+  if (framework->connected) {
+    FrameworkErrorMessage message;
+    message.set_message("Framework failed over");
+    framework->send(message);
+  }
+
+  // If this is an upgrade, clear the authentication related data.
+  if (framework->pid.isSome()) {
+    authenticated.erase(framework->pid.get());
+
+    CHECK(frameworks.principals.contains(framework->pid.get()));
+    Option<string> principal = frameworks.principals[framework->pid.get()];
+
+    frameworks.principals.erase(framework->pid.get());
+
+    // Remove the metrics for the principal if this framework is the
+    // last one with this principal.
+    if (principal.isSome() &&
+        !frameworks.principals.containsValue(principal.get())) {
+      CHECK(metrics->frameworks.contains(principal.get()));
+      metrics->frameworks.erase(principal.get());
+    }
+  }
+
+  framework->updateConnection(http);
+  http.closed()
+    .onAny(defer(self(), &Self::exited, framework->id(), http));
+
+  _failoverFramework(framework);
+}
+
+
 // Replace the scheduler for a framework with a new process ID, in the
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  CHECK_SOME(framework->pid) << "http framework failover not implemented";
-
-  const UPID oldPid = framework->pid.get();
+  const Option<UPID> oldPid = framework->pid;
 
   // There are a few failover cases to consider:
   //   1. The pid has changed. In this case we definitely want to
@@ -4738,25 +4974,31 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
   //          scheduler as it is necessarily dead.
   //      2.2 This is a duplicate message. In this case, the scheduler
   //          has not failed over, so we do not want to shut it down.
-  if (oldPid != newPid) {
+  if (oldPid.isSome() && oldPid != newPid) {
     FrameworkErrorMessage message;
     message.set_message("Framework failed over");
-    send(oldPid, message);
+    framework->send(message);
   }
 
-  // TODO(benh): unlink(oldPid);
-
-  framework->pid = newPid;
+  framework->updateConnection(newPid);
   link(newPid);
-  framework->connected = true;
 
-  // The scheduler driver safely ignores any duplicate registration
-  // messages, so we don't need to compare the old and new pids here.
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id());
-  message.mutable_master_info()->MergeFrom(info_);
-  framework->send(message);
+  _failoverFramework(framework);
+
+  CHECK_SOME(framework->pid);
+
+  // Update the principal mapping for this framework, which is
+  // needed to keep the per-principal framework metrics accurate.
+  if (oldPid.isSome() && frameworks.principals.contains(oldPid.get())) {
+    frameworks.principals.erase(oldPid.get());
+  }
+
+  frameworks.principals[newPid] = authenticated.get(newPid);
+}
 
+
+void Master::_failoverFramework(Framework* framework)
+{
   // Remove the framework's offers (if they weren't removed before).
   // We do this after we have updated the pid and sent the framework
   // registered message so that the allocator can immediately re-offer
@@ -4767,7 +5009,9 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
     removeOffer(offer);
   }
 
-  // Reactivate the framework.
+  // Reconnect and reactivate the framework.
+  framework->connected = true;
+
   // NOTE: We do this after recovering resources (above) so that
   // the allocator has the correct view of the framework's share.
   if (!framework->active) {
@@ -4775,12 +5019,12 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
     allocator->activateFramework(framework->id());
   }
 
-  // 'Failover' the framework's metrics. i.e., change the lookup key
-  // for its metrics to 'newPid'.
-  if (oldPid != newPid && frameworks.principals.contains(oldPid)) {
-    frameworks.principals[newPid] = frameworks.principals[oldPid];
-    frameworks.principals.erase(oldPid);
-  }
+  // The scheduler driver safely ignores any duplicate registration
+  // messages, so we don't need to compare the old and new pids here.
+  FrameworkRegisteredMessage message;
+  message.mutable_framework_id()->MergeFrom(framework->id());
+  message.mutable_master_info()->MergeFrom(info_);
+  framework->send(message);
 }
 
 
@@ -4886,23 +5130,19 @@ void Master::removeFramework(Framework* framework)
   // TODO(anand): This only works for pid based frameworks. We would
   // need similar authentication logic for http frameworks.
   if (framework->pid.isSome()) {
-    // Remove the framework from authenticated.
     authenticated.erase(framework->pid.get());
 
     CHECK(frameworks.principals.contains(framework->pid.get()));
-    const Option<string> principal =
-      frameworks.principals[framework->pid.get()];
+    Option<string> principal = frameworks.principals[framework->pid.get()];
 
     frameworks.principals.erase(framework->pid.get());
 
-    // Remove the framework's message counters.
-    if (principal.isSome()) {
-      // Remove the metrics for the principal if this framework is the
-      // last one with this principal.
-      if (!frameworks.principals.containsValue(principal.get())) {
-        CHECK(metrics->frameworks.contains(principal.get()));
-        metrics->frameworks.erase(principal.get());
-      }
+    // Remove the metrics for the principal if this framework is the
+    // last one with this principal.
+    if (principal.isSome() &&
+        !frameworks.principals.containsValue(principal.get())) {
+      CHECK(metrics->frameworks.contains(principal.get()));
+      metrics->frameworks.erase(principal.get());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d24b3ad6/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 53420ca..b288b8a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -583,6 +583,12 @@ protected:
   // the event of a scheduler failover.
   void failoverFramework(Framework* framework, const process::UPID& newPid);
 
+  // Replace the scheduler for a framework with a new http connection,
+  // in the event of a scheduler failover.
+  void failoverFramework(Framework* framework, const HttpConnection& http);
+
+  void _failoverFramework(Framework* framework);
+
   // Kill all of a framework's tasks, delete the framework object, and
   // reschedule offers that were assigned to this framework.
   void removeFramework(Framework* framework);
@@ -703,6 +709,14 @@ private:
       const scheduler::Call& call);
 
   void subscribe(
+      HttpConnection http,
+      const scheduler::Call::Subscribe& subscribe);
+
+  void _subscribe(
+      HttpConnection http,
+      const scheduler::Call::Subscribe& subscribe);
+
+  void subscribe(
       const process::UPID& from,
       const scheduler::Call::Subscribe& subscribe);
 
@@ -1508,14 +1522,33 @@ struct Framework
     }
   }
 
-  void updateConnection(const HttpConnection& other)
+  void updateConnection(const process::UPID& newPid)
   {
+    // Remove the http connnection if this is a downgrade from
+    // http to pid, note the connection may already be closed.
+    if (http.isSome()) {
+      http.get().close();
+      http = None();
+    }
+
+    // TODO(benh): unlink(oldPid);
+    pid = newPid;
+  }
+
+  void updateConnection(const HttpConnection& newHttp)
+  {
+    // Wipe the pid if this is an upgrade from pid to http.
+    if (pid.isSome()) {
+      // TODO(benh): unlink(oldPid);
+      pid = None();
+    }
+
     // Close the existing connection if it has changed.
-    if (http.isSome() && http.get().writer != other.writer) {
+    if (http.isSome() && http.get().writer != newHttp.writer) {
       http.get().close();
     }
 
-    http = other;
+    http = newHttp;
   }
 
   Master* const master;

Reply via email to