Repository: mesos
Updated Branches:
  refs/heads/master dbf35da46 -> 586307e54


Revert "Add subscribe-> subscribed workflow for http frameworks"

This reverts commit 1709e8a82fdae6a10893f0cfb9d2fae144d6a7a8.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/586307e5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/586307e5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/586307e5

Branch: refs/heads/master
Commit: 586307e546955c23aadf0ed5bfae9e00c0228f86
Parents: dbf35da
Author: Benjamin Mahler <[email protected]>
Authored: Wed Aug 5 16:47:36 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Aug 5 16:47:36 2015 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  21 +--
 src/master/master.cpp | 343 ++++++---------------------------------------
 src/master/master.hpp |  36 -----
 3 files changed, 42 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 181db46..76e7080 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -70,7 +70,6 @@ using process::http::InternalServerError;
 using process::http::NotFound;
 using process::http::NotImplemented;
 using process::http::OK;
-using process::http::Pipe;
 using process::http::TemporaryRedirect;
 using process::http::Unauthorized;
 using process::http::UnsupportedMediaType;
@@ -376,24 +375,10 @@ Future<Response> Master::Http::call(const Request& 
request) const
     responseContentType = ContentType::PROTOBUF;
   }
 
-  switch (call.type()) {
-    case scheduler::Call::SUBSCRIBE: {
-      Pipe pipe;
-      OK ok;
+  // Silence unused warning for now.
+  (void)responseContentType;
 
-      ok.type = Response::PIPE;
-      ok.reader = pipe.reader();
-
-      master->subscribe(
-          call.subscribe(),
-          responseContentType,
-          pipe.writer());
-
-      return ok;
-    }
-    default:
-      break;
-  }
+  // TODO(anand): Handle the call.
 
   return NotImplemented();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e738607..50b9824 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1771,227 +1771,6 @@ void Master::reregisterFramework(
 
 
 void Master::subscribe(
-    const scheduler::Call::Subscribe& subscribe,
-    ContentType contentType,
-    Pipe::Writer writer)
-{
-  // TODO(anand): We need to ensure that framework is authenticated
-  // before calling into this function. If not, we need to build the
-  // authentication logic here before invoking subscribe(...)
-
-  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
-
-  LOG(INFO) << "Received registration request for"
-            << " http framework '" << frameworkInfo.name() << "'";
-
-  // Assign a new FrameworkID.
-  FrameworkInfo frameworkInfo_ = frameworkInfo;
-  frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
-
-  Framework* framework = new Framework(
-      this,
-      frameworkInfo_,
-      writer,
-      contentType);
-
-  // TODO(anand): Add validation/authorization logic before we invoke
-  // the continuation function.
-  this->_subscribe(framework, subscribe);
-}
-
-
-void Master::_subscribe(
-    Framework* framework,
-    const scheduler::Call::Subscribe& subscribe)
-{
-  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
-
-  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
-    // TODO(anand): Make '(re-)registerFramework()' also call into
-    // this method (MESOS-3182).
-    LOG(INFO) << "Registering framework " << *framework
-              << " with checkpointing "
-              << (framework->info.checkpoint() ? "enabled" : "disabled")
-              << " and capabilities " << framework->info.capabilities();
-
-    // TODO(vinod): Deprecate this in favor of authorization.
-    bool rootSubmissions = flags.root_submissions;
-
-    if (framework->info.user() == "root" && rootSubmissions == false) {
-      LOG(INFO) << "Framework " << *framework << " registering as root, but "
-                << "root submissions are disabled on this cluster";
-      FrameworkErrorMessage message;
-      message.set_message("User 'root' is not allowed to run frameworks");
-      framework->send(message);
-      delete framework;
-      return;
-    }
-
-    addFramework(framework);
-
-    FrameworkRegisteredMessage message;
-    message.mutable_framework_id()->MergeFrom(framework->id());
-    message.mutable_master_info()->MergeFrom(info_);
-    framework->send(message);
-    return;
-  }
-
-  const bool force = subscribe.has_force() ? subscribe.force() : false;
-  const bool isRegistered = frameworks.registered.count(frameworkInfo.id());
-
-  // TODO(anand): Completed frameworks might try to subscribe
-  // again later. This is part of validation checks in the
-  // reregister(...) function. We need to refactor those checks into a
-  // separate function and then use them here.
-
-  LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
-            << " (" << frameworkInfo.name() << ")  with checkpointing "
-            << (frameworkInfo.checkpoint() ? "enabled" : "disabled")
-            << " and capabilities " << frameworkInfo.capabilities();
-
-  if (isRegistered) {
-    // Using the "failover" of the scheduler allows us to keep a
-    // scheduler that got partitioned but didn't die (in ZooKeeper
-    // speak this means didn't lose their session) and then
-    // eventually tried to connect to this master even though
-    // another instance of their scheduler has reconnected. This
-    // might not be an issue in the future when the
-    // master/allocator launches the scheduler can get restarted
-    // (if necessary) by the master and the master will always
-    // know which scheduler is the correct one.
-
-    Framework* registeredFramework =
-      CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
-
-    // Update the framework's info fields based on those passed during
-    // re-registration.
-    LOG(INFO) << "Updating info for framework " << framework->id();
-
-    registeredFramework->updateFrameworkInfo(framework->info);
-    allocator->updateFramework(
-        registeredFramework->id(),
-        registeredFramework->info);
-
-    registeredFramework->reregisteredTime = Clock::now();
-
-    if (force) {
-      // TODO(benh): Should we check whether the new scheduler has
-      // given us a different framework name, user name or executor
-      // info?
-      LOG(INFO) << "Framework " << *registeredFramework << " failed over";
-      failoverFramework(registeredFramework, framework);
-    } else {
-      if (registeredFramework->connected) {
-        // It's very hard to diffrentiate between retries from the
-        // same scheduler or a old scheduler that got failed over to a
-        // new one and is now retrying. An example scenario can be:
-        // A scheduler sends us a SUBCRIBE request, we process it and
-        // the stream breaks just before we could send a SUBSCRIBED
-        // event back. If the scheduler retries now, we would see it
-        // as connected. This scenario is no different from a old
-        // scheduler retrying after it has already failed over.
-        // Eventually the master would detect the disconnection, start
-        // the failover clock and then the framework can connect. This
-        // was slightly easier for pid frameworks as we used to check
-        // if the pid values matched.
-        FrameworkErrorMessage message;
-        message.set_message("Framework is already connected");
-        framework->send(message);
-        delete framework;
-        return;
-      }
-
-      LOG(INFO) << "Allowing framework " << *registeredFramework
-                << " to re-register with an already used id";
-
-      // Convert the framework to a http framework if it was pid based
-      // in the past. Also, we don't need to set the readerClosed(...)
-      // callbacks again as it was done already when the framework
-      // object was created.
-      if (registeredFramework->pid.isSome()) {
-        registeredFramework->pid = None();
-        registeredFramework->http = framework->http;
-      } else {
-        registeredFramework->updateWriter(framework->http.get().writer);
-      }
-
-      // Remove any offers sent to this framework and reactivate it.
-      // NOTE: We need to do this because the scheduler might have
-      // replied to the offers but the driver might have dropped
-      // those messages since it wasn't connected to the master.
-      rescindOffersAndReactivate(registeredFramework);
-
-      FrameworkReregisteredMessage message;
-      message.mutable_framework_id()->MergeFrom(registeredFramework->id());
-      message.mutable_master_info()->MergeFrom(info_);
-      registeredFramework->send(message);
-    }
-  } else {
-    // We don't have a framework with this ID, so we must be a newly
-    // elected Mesos master to which either an existing scheduler or a
-    // failed-over one is connecting. Add any tasks it has that have
-    // been reported by reconnecting slaves.
-
-    // TODO(benh): Check for root submissions like above!
-
-    // Add active tasks and executors to the framework.
-    foreachvalue (Slave* slave, slaves.registered) {
-      foreachvalue (Task* task, slave->tasks[framework->id()]) {
-        framework->addTask(task);
-      }
-      foreachvalue (const ExecutorInfo& executor,
-                    slave->executors[framework->id()]) {
-        framework->addExecutor(slave->id, executor);
-      }
-    }
-
-    // N.B. Need to add the framework _after_ we add its tasks
-    // (above) so that we can properly determine the resources it's
-    // currently using!
-    addFramework(framework);
-
-    // TODO(bmahler): We have to send a registered message here for
-    // the re-registering framework, per the API contract. Send
-    // re-register here per MESOS-786; requires deprecation or it
-    // will break frameworks.
-    FrameworkRegisteredMessage message;
-    message.mutable_framework_id()->MergeFrom(framework->id());
-    message.mutable_master_info()->MergeFrom(info_);
-    framework->send(message);
-  }
-
-  CHECK(frameworks.registered.contains(frameworkInfo.id()))
-    << "Unknown framework " << frameworkInfo.id()
-    << " (" << frameworkInfo.name() << ")";
-
-  // Broadcast the new framework pid to all the slaves. We have to
-  // broadcast because an executor might be running on a slave but
-  // it currently isn't running any tasks. This could be a
-  // potential scalability issue ...
-  foreachvalue (Slave* slave, slaves.registered) {
-    UpdateFrameworkMessage message;
-    message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
-
-    // TODO(anand): We set 'pid' to UPID() for http frameworks
-    // as 'pid' was made optional in 0.24.0. In 0.25.0, we
-    // no longer have to set pid here for http frameowrks.
-    message.set_pid(framework->pid.getOrElse(UPID()));
-    send(slave->pid, message);
-  }
-
-  if (isRegistered) {
-    // Mark the framework as disconnected before deleting so as not
-    // to trigger closing the writer.
-    framework->connected = false;
-
-    // We only updated attributes in the existing stored framework.
-    // Delete the optimistically created framework object.
-    delete framework;
-  }
-}
-
-
-void Master::subscribe(
     const UPID& from,
     const scheduler::Call::Subscribe& subscribe)
 {
@@ -2202,7 +1981,7 @@ void Master::_subscribe(
         << " because it is not expected from " << from;
       FrameworkErrorMessage message;
       message.set_message("Framework failed over");
-      framework->send(message);
+      send(from, message);
       return;
     } else {
       LOG(INFO) << "Allowing framework " << *framework
@@ -4886,11 +4665,11 @@ void Master::addFramework(Framework* framework)
   CHECK(!frameworks.registered.contains(framework->id()))
     << "Framework " << *framework << " already exists!";
 
+  CHECK_SOME(framework->pid) << "adding http framework not implemented";
+
   frameworks.registered[framework->id()] = framework;
 
-  if (framework->pid.isSome()) {
-    link(framework->pid.get());
-  }
+  link(framework->pid.get());
 
   // TODO(anand): For http frameworks, add a readerClosed()
   // callback to invoke Master::exited() when the connection
@@ -4916,87 +4695,24 @@ void Master::addFramework(Framework* framework)
   // If the framework is authenticated, its principal should be in
   // 'authenticated'. Otherwise look if it's supplied in
   // FrameworkInfo.
-  if (framework->pid.isSome()) {
-    Option<string> principal = authenticated.get(framework->pid.get());
-    if (principal.isNone() && framework->info.has_principal()) {
-      principal = framework->info.principal();
-    }
-
-    CHECK(!frameworks.principals.contains(framework->pid.get()));
-    frameworks.principals.put(framework->pid.get(), principal);
-
-    // Export framework metrics if a principal is specified.
-    if (principal.isSome()) {
-      // Create new framework metrics if this framework is the first
-      // one of this principal. Otherwise existing metrics are reused.
-      if (!metrics->frameworks.contains(principal.get())) {
-        metrics->frameworks.put(
-            principal.get(),
-            Owned<Metrics::Frameworks>(
-              new Metrics::Frameworks(principal.get())));
-      }
-    }
+  Option<string> principal = authenticated.get(framework->pid.get());
+  if (principal.isNone() && framework->info.has_principal()) {
+    principal = framework->info.principal();
   }
-}
 
+  CHECK(!frameworks.principals.contains(framework->pid.get()));
+  frameworks.principals.put(framework->pid.get(), principal);
 
-void Master::rescindOffersAndReactivate(Framework* framework)
-{
-  // Remove the framework's offers (if they weren't removed before).
-  // We do this after we have updated the pid and sent the framework
-  // registered message so that the allocator can immediately re-offer
-  // these resources to this framework if it wants.
-  foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->recoverResources(
-        offer->framework_id(), offer->slave_id(), offer->resources(), None());
-    removeOffer(offer);
-  }
-
-  // Reactivate the framework.
-  // NOTE: We do this after recovering resources (above) so that
-  // the allocator has the correct view of the framework's share.
-  if (!framework->active) {
-    framework->active = true;
-    allocator->activateFramework(framework->id());
+  // Export framework metrics if a principal is specified.
+  if (principal.isSome()) {
+    // Create new framework metrics if this framework is the first
+    // one of this principal. Otherwise existing metrics are reused.
+    if (!metrics->frameworks.contains(principal.get())) {
+      metrics->frameworks.put(
+          principal.get(),
+          Owned<Metrics::Frameworks>(new 
Metrics::Frameworks(principal.get())));
+    }
   }
-
-  // Mark the framework as connected now after reactivating it.
-  framework->connected = true;
-}
-
-
-// TODO(anand): Currently this function is only used to failover http
-// frameworks. Extend this function to handle both pid/http.
-// We can then get rid of the failoverFramework(...) overload for pid
-// based frameworks.
-void Master::failoverFramework(Framework* framework, Framework* newFramework)
-{
-  // Notify the old connected framework that it has failed over.
-  // It might be a duplicated message too, but it is virtually
-  // impossible for us to distinguish between them. We try to
-  // do a best effort logic here of letting the old scheduler
-  // know.
-  if (framework->connected) {
-    FrameworkErrorMessage message;
-    message.set_message("Framework failed over");
-    framework->send(message);
-  }
-
-  // The framework was previously registered as pid based and is
-  // now trying to failover as a http framework. Set it's pid
-  // to None and convert it to a http framework.
-  if (framework->pid.isSome()) {
-    framework->pid = None();
-    framework->http = newFramework->http;
-  }
-
-  framework->updateWriter(newFramework->http.get().writer);
-  rescindOffersAndReactivate(framework);
-
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id());
-  message.mutable_master_info()->MergeFrom(info_);
-  framework->send(message);
 }
 
 
@@ -5004,6 +4720,8 @@ void Master::failoverFramework(Framework* framework, 
Framework* newFramework)
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
+  CHECK_SOME(framework->pid) << "http framework failover not implemented";
+
   const UPID oldPid = framework->pid.get();
 
   // There are a few failover cases to consider:
@@ -5026,8 +4744,7 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
 
   framework->pid = newPid;
   link(newPid);
-
-  rescindOffersAndReactivate(framework);
+  framework->connected = true;
 
   // The scheduler driver safely ignores any duplicate registration
   // messages, so we don't need to compare the old and new pids here.
@@ -5036,6 +4753,24 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
   message.mutable_master_info()->MergeFrom(info_);
   framework->send(message);
 
+  // Remove the framework's offers (if they weren't removed before).
+  // We do this after we have updated the pid and sent the framework
+  // registered message so that the allocator can immediately re-offer
+  // these resources to this framework if it wants.
+  foreach (Offer* offer, utils::copy(framework->offers)) {
+    allocator->recoverResources(
+        offer->framework_id(), offer->slave_id(), offer->resources(), None());
+    removeOffer(offer);
+  }
+
+  // Reactivate the framework.
+  // NOTE: We do this after recovering resources (above) so that
+  // the allocator has the correct view of the framework's share.
+  if (!framework->active) {
+    framework->active = true;
+    allocator->activateFramework(framework->id());
+  }
+
   // 'Failover' the framework's metrics. i.e., change the lookup key
   // for its metrics to 'newPid'.
   if (oldPid != newPid && frameworks.principals.contains(oldPid)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/586307e5/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c71e343..e441749 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -584,14 +584,6 @@ protected:
   // the event of a scheduler failover.
   void failoverFramework(Framework* framework, const process::UPID& newPid);
 
-  // Failover the old framework. Copies various attributes from the
-  // new framework passed as argument.
-  void failoverFramework(Framework* framework, Framework* newFramework);
-
-  // Rescinds the framework's offers and reactivates the framework
-  // marking it as connected.
-  void rescindOffersAndReactivate(Framework* framework);
-
   // Kill all of a framework's tasks, delete the framework object, and
   // reschedule offers that were assigned to this framework.
   void removeFramework(Framework* framework);
@@ -706,23 +698,11 @@ private:
       const Offer::Operation& operation,
       const std::string& message);
 
-  // Subscribes a http framework.
-  void subscribe(
-      const scheduler::Call::Subscribe& subscribe,
-      ContentType contentType,
-      process::http::Pipe::Writer writer);
-
-  // Continuation of subscribe().
-  void _subscribe(
-      Framework* framework,
-      const scheduler::Call::Subscribe& subscribe);
-
   // Call handlers.
   void receive(
       const process::UPID& from,
       const scheduler::Call& call);
 
-  // Subscribes a pid framework.
   void subscribe(
       const process::UPID& from,
       const scheduler::Call::Subscribe& subscribe);
@@ -1523,22 +1503,6 @@ struct Framework
     }
   }
 
-  void updateWriter(process::http::Pipe::Writer writer)
-  {
-    // Don't update with the same writer.
-    if (http.get().writer == writer) {
-      return;
-    }
-
-    // Close the existing connection and update the writer.
-    http.get().writer.close();
-    http.get().writer = writer;
-
-    // Set up the reader closed callback.
-    http.get().writer.readerClosed()
-      .onAny(defer(master->self(), &Master::exited, id(), writer));
-  }
-
   Master* const master;
 
   FrameworkInfo info;

Reply via email to