Repository: mesos
Updated Branches:
  refs/heads/master bc26a44a5 -> dbf35da46


Add subscribe-> subscribed workflow for http frameworks

Split review out of r36318. This change adds the functionality of making a http 
call for subscribe and the master responding with a subscribed event on the 
persistent stream.

Also added functionality for framework failover equivalent of re-register. It 
should now be possible to merge the subscribed(...) introduced in this review 
and the re-factor that happened in MESOS-3182.

- Made a new function for exited()/failoverFramework for http frameworks that 
invoke into the common continuation function for pid/http frameworks thereafter.
- The re-register functionality equivalent goes in _subscribe(...)

Review: https://reviews.apache.org/r/36720


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1709e8a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1709e8a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1709e8a8

Branch: refs/heads/master
Commit: 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8
Parents: bc26a44
Author: Anand Mazumdar <[email protected]>
Authored: Wed Aug 5 14:01:13 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Aug 5 14:01:15 2015 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  21 ++-
 src/master/master.cpp | 343 +++++++++++++++++++++++++++++++++++++++------
 src/master/master.hpp |  36 +++++
 3 files changed, 358 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 76e7080..181db46 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -70,6 +70,7 @@ using process::http::InternalServerError;
 using process::http::NotFound;
 using process::http::NotImplemented;
 using process::http::OK;
+using process::http::Pipe;
 using process::http::TemporaryRedirect;
 using process::http::Unauthorized;
 using process::http::UnsupportedMediaType;
@@ -375,10 +376,24 @@ Future<Response> Master::Http::call(const Request& 
request) const
     responseContentType = ContentType::PROTOBUF;
   }
 
-  // Silence unused warning for now.
-  (void)responseContentType;
+  switch (call.type()) {
+    case scheduler::Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
 
-  // TODO(anand): Handle the call.
+      ok.type = Response::PIPE;
+      ok.reader = pipe.reader();
+
+      master->subscribe(
+          call.subscribe(),
+          responseContentType,
+          pipe.writer());
+
+      return ok;
+    }
+    default:
+      break;
+  }
 
   return NotImplemented();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 50b9824..e738607 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1771,6 +1771,227 @@ void Master::reregisterFramework(
 
 
 void Master::subscribe(
+    const scheduler::Call::Subscribe& subscribe,
+    ContentType contentType,
+    Pipe::Writer writer)
+{
+  // TODO(anand): We need to ensure that framework is authenticated
+  // before calling into this function. If not, we need to build the
+  // authentication logic here before invoking subscribe(...)
+
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+
+  LOG(INFO) << "Received registration request for"
+            << " http framework '" << frameworkInfo.name() << "'";
+
+  // Assign a new FrameworkID.
+  FrameworkInfo frameworkInfo_ = frameworkInfo;
+  frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
+
+  Framework* framework = new Framework(
+      this,
+      frameworkInfo_,
+      writer,
+      contentType);
+
+  // TODO(anand): Add validation/authorization logic before we invoke
+  // the continuation function.
+  this->_subscribe(framework, subscribe);
+}
+
+
+void Master::_subscribe(
+    Framework* framework,
+    const scheduler::Call::Subscribe& subscribe)
+{
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+
+  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
+    // TODO(anand): Make '(re-)registerFramework()' also call into
+    // this method (MESOS-3182).
+    LOG(INFO) << "Registering framework " << *framework
+              << " with checkpointing "
+              << (framework->info.checkpoint() ? "enabled" : "disabled")
+              << " and capabilities " << framework->info.capabilities();
+
+    // TODO(vinod): Deprecate this in favor of authorization.
+    bool rootSubmissions = flags.root_submissions;
+
+    if (framework->info.user() == "root" && rootSubmissions == false) {
+      LOG(INFO) << "Framework " << *framework << " registering as root, but "
+                << "root submissions are disabled on this cluster";
+      FrameworkErrorMessage message;
+      message.set_message("User 'root' is not allowed to run frameworks");
+      framework->send(message);
+      delete framework;
+      return;
+    }
+
+    addFramework(framework);
+
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id());
+    message.mutable_master_info()->MergeFrom(info_);
+    framework->send(message);
+    return;
+  }
+
+  const bool force = subscribe.has_force() ? subscribe.force() : false;
+  const bool isRegistered = frameworks.registered.count(frameworkInfo.id());
+
+  // TODO(anand): Completed frameworks might try to subscribe
+  // again later. This is part of validation checks in the
+  // reregister(...) function. We need to refactor those checks into a
+  // separate function and then use them here.
+
+  LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
+            << " (" << frameworkInfo.name() << ")  with checkpointing "
+            << (frameworkInfo.checkpoint() ? "enabled" : "disabled")
+            << " and capabilities " << frameworkInfo.capabilities();
+
+  if (isRegistered) {
+    // Using the "failover" of the scheduler allows us to keep a
+    // scheduler that got partitioned but didn't die (in ZooKeeper
+    // speak this means didn't lose their session) and then
+    // eventually tried to connect to this master even though
+    // another instance of their scheduler has reconnected. This
+    // might not be an issue in the future when the
+    // master/allocator launches the scheduler can get restarted
+    // (if necessary) by the master and the master will always
+    // know which scheduler is the correct one.
+
+    Framework* registeredFramework =
+      CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
+
+    // Update the framework's info fields based on those passed during
+    // re-registration.
+    LOG(INFO) << "Updating info for framework " << framework->id();
+
+    registeredFramework->updateFrameworkInfo(framework->info);
+    allocator->updateFramework(
+        registeredFramework->id(),
+        registeredFramework->info);
+
+    registeredFramework->reregisteredTime = Clock::now();
+
+    if (force) {
+      // TODO(benh): Should we check whether the new scheduler has
+      // given us a different framework name, user name or executor
+      // info?
+      LOG(INFO) << "Framework " << *registeredFramework << " failed over";
+      failoverFramework(registeredFramework, framework);
+    } else {
+      if (registeredFramework->connected) {
+        // It's very hard to diffrentiate between retries from the
+        // same scheduler or a old scheduler that got failed over to a
+        // new one and is now retrying. An example scenario can be:
+        // A scheduler sends us a SUBCRIBE request, we process it and
+        // the stream breaks just before we could send a SUBSCRIBED
+        // event back. If the scheduler retries now, we would see it
+        // as connected. This scenario is no different from a old
+        // scheduler retrying after it has already failed over.
+        // Eventually the master would detect the disconnection, start
+        // the failover clock and then the framework can connect. This
+        // was slightly easier for pid frameworks as we used to check
+        // if the pid values matched.
+        FrameworkErrorMessage message;
+        message.set_message("Framework is already connected");
+        framework->send(message);
+        delete framework;
+        return;
+      }
+
+      LOG(INFO) << "Allowing framework " << *registeredFramework
+                << " to re-register with an already used id";
+
+      // Convert the framework to a http framework if it was pid based
+      // in the past. Also, we don't need to set the readerClosed(...)
+      // callbacks again as it was done already when the framework
+      // object was created.
+      if (registeredFramework->pid.isSome()) {
+        registeredFramework->pid = None();
+        registeredFramework->http = framework->http;
+      } else {
+        registeredFramework->updateWriter(framework->http.get().writer);
+      }
+
+      // Remove any offers sent to this framework and reactivate it.
+      // NOTE: We need to do this because the scheduler might have
+      // replied to the offers but the driver might have dropped
+      // those messages since it wasn't connected to the master.
+      rescindOffersAndReactivate(registeredFramework);
+
+      FrameworkReregisteredMessage message;
+      message.mutable_framework_id()->MergeFrom(registeredFramework->id());
+      message.mutable_master_info()->MergeFrom(info_);
+      registeredFramework->send(message);
+    }
+  } else {
+    // We don't have a framework with this ID, so we must be a newly
+    // elected Mesos master to which either an existing scheduler or a
+    // failed-over one is connecting. Add any tasks it has that have
+    // been reported by reconnecting slaves.
+
+    // TODO(benh): Check for root submissions like above!
+
+    // Add active tasks and executors to the framework.
+    foreachvalue (Slave* slave, slaves.registered) {
+      foreachvalue (Task* task, slave->tasks[framework->id()]) {
+        framework->addTask(task);
+      }
+      foreachvalue (const ExecutorInfo& executor,
+                    slave->executors[framework->id()]) {
+        framework->addExecutor(slave->id, executor);
+      }
+    }
+
+    // N.B. Need to add the framework _after_ we add its tasks
+    // (above) so that we can properly determine the resources it's
+    // currently using!
+    addFramework(framework);
+
+    // TODO(bmahler): We have to send a registered message here for
+    // the re-registering framework, per the API contract. Send
+    // re-register here per MESOS-786; requires deprecation or it
+    // will break frameworks.
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id());
+    message.mutable_master_info()->MergeFrom(info_);
+    framework->send(message);
+  }
+
+  CHECK(frameworks.registered.contains(frameworkInfo.id()))
+    << "Unknown framework " << frameworkInfo.id()
+    << " (" << frameworkInfo.name() << ")";
+
+  // Broadcast the new framework pid to all the slaves. We have to
+  // broadcast because an executor might be running on a slave but
+  // it currently isn't running any tasks. This could be a
+  // potential scalability issue ...
+  foreachvalue (Slave* slave, slaves.registered) {
+    UpdateFrameworkMessage message;
+    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
+
+    // TODO(anand): We set 'pid' to UPID() for http frameworks
+    // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+    // no longer have to set pid here for http frameowrks.
+    message.set_pid(framework->pid.getOrElse(UPID()));
+    send(slave->pid, message);
+  }
+
+  if (isRegistered) {
+    // Mark the framework as disconnected before deleting so as not
+    // to trigger closing the writer.
+    framework->connected = false;
+
+    // We only updated attributes in the existing stored framework.
+    // Delete the optimistically created framework object.
+    delete framework;
+  }
+}
+
+
+void Master::subscribe(
     const UPID& from,
     const scheduler::Call::Subscribe& subscribe)
 {
@@ -1981,7 +2202,7 @@ void Master::_subscribe(
         << " because it is not expected from " << from;
       FrameworkErrorMessage message;
       message.set_message("Framework failed over");
-      send(from, message);
+      framework->send(message);
       return;
     } else {
       LOG(INFO) << "Allowing framework " << *framework
@@ -4665,11 +4886,11 @@ void Master::addFramework(Framework* framework)
   CHECK(!frameworks.registered.contains(framework->id()))
     << "Framework " << *framework << " already exists!";
 
-  CHECK_SOME(framework->pid) << "adding http framework not implemented";
-
   frameworks.registered[framework->id()] = framework;
 
-  link(framework->pid.get());
+  if (framework->pid.isSome()) {
+    link(framework->pid.get());
+  }
 
   // TODO(anand): For http frameworks, add a readerClosed()
   // callback to invoke Master::exited() when the connection
@@ -4695,33 +4916,94 @@ void Master::addFramework(Framework* framework)
   // If the framework is authenticated, its principal should be in
   // 'authenticated'. Otherwise look if it's supplied in
   // FrameworkInfo.
-  Option<string> principal = authenticated.get(framework->pid.get());
-  if (principal.isNone() && framework->info.has_principal()) {
-    principal = framework->info.principal();
-  }
+  if (framework->pid.isSome()) {
+    Option<string> principal = authenticated.get(framework->pid.get());
+    if (principal.isNone() && framework->info.has_principal()) {
+      principal = framework->info.principal();
+    }
 
-  CHECK(!frameworks.principals.contains(framework->pid.get()));
-  frameworks.principals.put(framework->pid.get(), principal);
+    CHECK(!frameworks.principals.contains(framework->pid.get()));
+    frameworks.principals.put(framework->pid.get(), principal);
 
-  // Export framework metrics if a principal is specified.
-  if (principal.isSome()) {
-    // Create new framework metrics if this framework is the first
-    // one of this principal. Otherwise existing metrics are reused.
-    if (!metrics->frameworks.contains(principal.get())) {
-      metrics->frameworks.put(
-          principal.get(),
-          Owned<Metrics::Frameworks>(new 
Metrics::Frameworks(principal.get())));
+    // Export framework metrics if a principal is specified.
+    if (principal.isSome()) {
+      // Create new framework metrics if this framework is the first
+      // one of this principal. Otherwise existing metrics are reused.
+      if (!metrics->frameworks.contains(principal.get())) {
+        metrics->frameworks.put(
+            principal.get(),
+            Owned<Metrics::Frameworks>(
+              new Metrics::Frameworks(principal.get())));
+      }
     }
   }
 }
 
 
+void Master::rescindOffersAndReactivate(Framework* framework)
+{
+  // Remove the framework's offers (if they weren't removed before).
+  // We do this after we have updated the pid and sent the framework
+  // registered message so that the allocator can immediately re-offer
+  // these resources to this framework if it wants.
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    allocator->recoverResources(
+        offer->framework_id(), offer->slave_id(), offer->resources(), None());
+    removeOffer(offer);
+  }
+
+  // Reactivate the framework.
+  // NOTE: We do this after recovering resources (above) so that
+  // the allocator has the correct view of the framework's share.
+  if (!framework->active) {
+    framework->active = true;
+    allocator->activateFramework(framework->id());
+  }
+
+  // Mark the framework as connected now after reactivating it.
+  framework->connected = true;
+}
+
+
+// TODO(anand): Currently this function is only used to failover http
+// frameworks. Extend this function to handle both pid/http.
+// We can then get rid of the failoverFramework(...) overload for pid
+// based frameworks.
+void Master::failoverFramework(Framework* framework, Framework* newFramework)
+{
+  // Notify the old connected framework that it has failed over.
+  // It might be a duplicated message too, but it is virtually
+  // impossible for us to distinguish between them. We try to
+  // do a best effort logic here of letting the old scheduler
+  // know.
+  if (framework->connected) {
+    FrameworkErrorMessage message;
+    message.set_message("Framework failed over");
+    framework->send(message);
+  }
+
+  // The framework was previously registered as pid based and is
+  // now trying to failover as a http framework. Set it's pid
+  // to None and convert it to a http framework.
+  if (framework->pid.isSome()) {
+    framework->pid = None();
+    framework->http = newFramework->http;
+  }
+
+  framework->updateWriter(newFramework->http.get().writer);
+  rescindOffersAndReactivate(framework);
+
+  FrameworkRegisteredMessage message;
+  message.mutable_framework_id()->MergeFrom(framework->id());
+  message.mutable_master_info()->MergeFrom(info_);
+  framework->send(message);
+}
+
+
 // Replace the scheduler for a framework with a new process ID, in the
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  CHECK_SOME(framework->pid) << "http framework failover not implemented";
-
   const UPID oldPid = framework->pid.get();
 
   // There are a few failover cases to consider:
@@ -4744,7 +5026,8 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
 
   framework->pid = newPid;
   link(newPid);
-  framework->connected = true;
+
+  rescindOffersAndReactivate(framework);
 
   // The scheduler driver safely ignores any duplicate registration
   // messages, so we don't need to compare the old and new pids here.
@@ -4753,24 +5036,6 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
   message.mutable_master_info()->MergeFrom(info_);
   framework->send(message);
 
-  // Remove the framework's offers (if they weren't removed before).
-  // We do this after we have updated the pid and sent the framework
-  // registered message so that the allocator can immediately re-offer
-  // these resources to this framework if it wants.
-  foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->recoverResources(
-        offer->framework_id(), offer->slave_id(), offer->resources(), None());
-    removeOffer(offer);
-  }
-
-  // Reactivate the framework.
-  // NOTE: We do this after recovering resources (above) so that
-  // the allocator has the correct view of the framework's share.
-  if (!framework->active) {
-    framework->active = true;
-    allocator->activateFramework(framework->id());
-  }
-
   // 'Failover' the framework's metrics. i.e., change the lookup key
   // for its metrics to 'newPid'.
   if (oldPid != newPid && frameworks.principals.contains(oldPid)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/1709e8a8/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e441749..c71e343 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -584,6 +584,14 @@ protected:
   // the event of a scheduler failover.
   void failoverFramework(Framework* framework, const process::UPID& newPid);
 
+  // Failover the old framework. Copies various attributes from the
+  // new framework passed as argument.
+  void failoverFramework(Framework* framework, Framework* newFramework);
+
+  // Rescinds the framework's offers and reactivates the framework
+  // marking it as connected.
+  void rescindOffersAndReactivate(Framework* framework);
+
   // Kill all of a framework's tasks, delete the framework object, and
   // reschedule offers that were assigned to this framework.
   void removeFramework(Framework* framework);
@@ -698,11 +706,23 @@ private:
       const Offer::Operation& operation,
       const std::string& message);
 
+  // Subscribes a http framework.
+  void subscribe(
+      const scheduler::Call::Subscribe& subscribe,
+      ContentType contentType,
+      process::http::Pipe::Writer writer);
+
+  // Continuation of subscribe().
+  void _subscribe(
+      Framework* framework,
+      const scheduler::Call::Subscribe& subscribe);
+
   // Call handlers.
   void receive(
       const process::UPID& from,
       const scheduler::Call& call);
 
+  // Subscribes a pid framework.
   void subscribe(
       const process::UPID& from,
       const scheduler::Call::Subscribe& subscribe);
@@ -1503,6 +1523,22 @@ struct Framework
     }
   }
 
+  void updateWriter(process::http::Pipe::Writer writer)
+  {
+    // Don't update with the same writer.
+    if (http.get().writer == writer) {
+      return;
+    }
+
+    // Close the existing connection and update the writer.
+    http.get().writer.close();
+    http.get().writer = writer;
+
+    // Set up the reader closed callback.
+    http.get().writer.readerClosed()
+      .onAny(defer(master->self(), &Master::exited, id(), writer));
+  }
+
   Master* const master;
 
   FrameworkInfo info;

Reply via email to