Repository: mesos
Updated Branches:
  refs/heads/master 0d08b2224 -> b8bec2027


Merged registerFramework() and reregisterFramework() into subscribe()
in Master.

Review: https://reviews.apache.org/r/37046


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8bec202
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8bec202
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8bec202

Branch: refs/heads/master
Commit: b8bec2027e96d08f3b8c1d3f2313b39bd20aa132
Parents: 0d08b22
Author: Vinod Kone <[email protected]>
Authored: Fri Jul 31 15:03:10 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Tue Aug 4 10:53:53 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 293 +++++++++++++++------------------------------
 src/master/master.hpp |  18 +--
 2 files changed, 103 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b8bec202/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 351a3c2..87e11d5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1694,183 +1694,77 @@ void Master::registerFramework(
 {
   ++metrics->messages_register_framework;
 
-  if (authenticating.contains(from)) {
-    // TODO(vinod): Consider dropping this request and fix the tests
-    // to deal with the drop. Currently there is a race between master
-    // realizing the framework is authenticated and framework sending
-    // a registration request. Dropping this message will cause the
-    // framework to retry slowing down the tests.
-    LOG(INFO) << "Queuing up registration request for"
-              << " framework '" << frameworkInfo.name() << "' at " << from
-              << " because authentication is still in progress";
-
-    authenticating[from]
-      .onReady(defer(self(), &Self::registerFramework, from, frameworkInfo));
-    return;
-  }
-
-  Option<Error> validationError = None();
-
-  // TODO(vinod): Add "!=" operator for FrameworkID.
-  if (frameworkInfo.has_id() && !(frameworkInfo.id() == "")) {
-    validationError = Error("Registering with 'id' already set");
-  }
-
-  // TODO(vinod): Deprecate this in favor of ACLs.
-  if (validationError.isNone() && !roles.contains(frameworkInfo.role())) {
-    validationError = Error("Role '" + frameworkInfo.role() + "' is not" +
-                            " present in the master's --roles");
-  }
-
-  // TODO(vinod): Deprecate this in favor of authorization.
-  if (validationError.isNone()) {
-    if (frameworkInfo.user() == "root" && !flags.root_submissions) {
-      validationError = Error("User 'root' is not allowed to run frameworks"
-                              " without --root_submissions set");
-    }
-  }
+  if (frameworkInfo.has_id() && !frameworkInfo.id().value().empty()) {
+    const string error = "Registering with 'id' already set";
 
-  // Note that re-authentication errors are already handled above.
-  if (validationError.isNone()) {
-    validationError = validateFrameworkAuthentication(frameworkInfo, from);
-  }
-
-  if (validationError.isSome()) {
-    LOG(INFO) << "Refusing registration of framework"
-              << " '" << frameworkInfo.name() << "' at " << from << ": "
-              << validationError.get().message;
+    LOG(INFO) << "Refusing registration request of framework"
+              << " '" << frameworkInfo.name() << "' at " << from
+              << ": " << error;
 
     FrameworkErrorMessage message;
-    message.set_message(validationError.get().message);
+    message.set_message(error);
     send(from, message);
     return;
   }
 
-  LOG(INFO) << "Received registration request for"
-            << " framework '" << frameworkInfo.name() << "' at " << from;
+  scheduler::Call::Subscribe call;
+  call.mutable_framework_info()->CopyFrom(frameworkInfo);
 
-  // We allow an authenticated framework to not specify a principal
-  // in FrameworkInfo but we'd prefer if it did so we log a WARNING
-  // here when it happens.
-  if (!frameworkInfo.has_principal() && authenticated.contains(from)) {
-    LOG(WARNING) << "Framework at " << from
-                 << " (authenticated as '" << authenticated[from] << "')"
-                 << " does not set 'principal' in FrameworkInfo";
-  }
-
-  authorizeFramework(frameworkInfo)
-    .onAny(defer(self(),
-                 &Master::_registerFramework,
-                 from,
-                 frameworkInfo,
-                 lambda::_1));
+  subscribe(from, call);
 }
 
 
-void Master::_registerFramework(
+void Master::reregisterFramework(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
-    const Future<bool>& authorized)
+    bool failover)
 {
-  CHECK(!authorized.isDiscarded());
+  ++metrics->messages_reregister_framework;
 
-  Option<Error> authorizationError = None();
+  if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) {
+    const string error = "Re-registering without an 'id'";
 
-  if (authorized.isFailed()) {
-    authorizationError =
-      Error("Authorization failure: " + authorized.failure());
-  } else if (!authorized.get()) {
-    authorizationError =
-      Error("Not authorized to use role '" + frameworkInfo.role() + "'");
-  }
-
-  if (authorizationError.isSome()) {
-    LOG(INFO) << "Refusing registration of framework"
+    LOG(INFO) << "Refusing re-registration request of framework"
               << " '" << frameworkInfo.name() << "' at " << from
-              << ": " << authorizationError.get().message;
+              << ": " << error;
 
     FrameworkErrorMessage message;
-    message.set_message(authorizationError.get().message);
+    message.set_message(error);
     send(from, message);
     return;
   }
 
-  // At this point, authentications errors will be due to
-  // re-authentication during the authorization process,
-  // so we drop the registration.
-  Option<Error> authenticationError =
-    validateFrameworkAuthentication(frameworkInfo, from);
-
-  if (authenticationError.isSome()) {
-    LOG(INFO) << "Dropping registration request for framework"
-              << " '" << frameworkInfo.name() << "' at " << from
-              << ": " << authenticationError.get().message;
-    return;
-  }
-
-  // Check if this framework is already registered (because it retries).
-  foreachvalue (Framework* framework, frameworks.registered) {
-    if (framework->pid == from) {
-      LOG(INFO) << "Framework " << *framework
-                << " already registered, resending acknowledgement";
-      FrameworkRegisteredMessage message;
-      message.mutable_framework_id()->MergeFrom(framework->id());
-      message.mutable_master_info()->MergeFrom(info_);
-      framework->send(message);
-      return;
-    }
-  }
-
-  // Assign a new FrameworkID.
-  FrameworkInfo frameworkInfo_ = frameworkInfo;
-  frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
-
-  Framework* framework = new Framework(this, frameworkInfo_, from);
-
-  LOG(INFO) << "Registering framework " << *framework
-            << " with checkpointing "
-            << (framework->info.checkpoint() ? "enabled" : "disabled")
-            << " and capabilities " << framework->info.capabilities();
+  scheduler::Call::Subscribe call;
+  call.mutable_framework_info()->CopyFrom(frameworkInfo);
+  call.set_force(failover);
 
-  addFramework(framework);
-
-  FrameworkRegisteredMessage message;
-  message.mutable_framework_id()->MergeFrom(framework->id());
-  message.mutable_master_info()->MergeFrom(info_);
-  framework->send(message);
+  subscribe(from, call);
 }
 
 
-void Master::reregisterFramework(
+void Master::subscribe(
     const UPID& from,
-    const FrameworkInfo& frameworkInfo,
-    bool failover)
+    const scheduler::Call::Subscribe& subscribe)
 {
-  ++metrics->messages_reregister_framework;
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
 
   if (authenticating.contains(from)) {
-    LOG(INFO) << "Queuing up re-registration request for framework "
-              << frameworkInfo.id() << " (" << frameworkInfo.name() << ") at "
-              << from << " because authentication is still in progress";
-
     // TODO(vinod): Consider dropping this request and fix the tests
-    // to deal with the drop. See 'Master::registerFramework()' for
-    // more details.
+    // to deal with the drop. Currently there is a race between master
+    // realizing the framework is authenticated and framework sending
+    // a subscribe call. Dropping this message will cause the
+    // framework to retry slowing down the tests.
+    LOG(INFO) << "Queuing up SUBSCRIBE call for"
+              << " framework '" << frameworkInfo.name() << "' at " << from
+              << " because authentication is still in progress";
+
     authenticating[from]
-      .onReady(defer(self(),
-                     &Self::reregisterFramework,
-                     from,
-                     frameworkInfo,
-                     failover));
+      .onReady(defer(self(), &Self::subscribe, from, subscribe));
     return;
   }
 
   Option<Error> validationError = None();
 
-  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
-    validationError = Error("Re-registering without 'id' set");
-  }
-
   // TODO(vinod): Deprecate this in favor of ACLs.
   if (validationError.isNone() && !roles.contains(frameworkInfo.role())) {
     validationError = Error("Role '" + frameworkInfo.role() + "' is not" +
@@ -1878,19 +1772,19 @@ void Master::reregisterFramework(
   }
 
   // TODO(vinod): Deprecate this in favor of authorization.
-  if (validationError.isNone()) {
-    if (frameworkInfo.user() == "root" && !flags.root_submissions) {
-      validationError = Error("User 'root' is not allowed to run frameworks"
-                              " without --root_submissions set");
-    }
+  if (validationError.isNone() &&
+      frameworkInfo.user() == "root" && !flags.root_submissions) {
+    validationError = Error("User 'root' is not allowed to run frameworks"
+                            " without --root_submissions set");
   }
 
-  if (validationError.isNone()) {
+  if (validationError.isNone() && frameworkInfo.has_id()) {
     foreach (const shared_ptr<Framework>& framework, frameworks.completed) {
       if (framework->id() == frameworkInfo.id()) {
-        // This could happen if a framework tries to re-register after
+        // This could happen if a framework tries to subscribe after
         // its failover timeout has elapsed or it unregistered itself
         // by calling 'stop()' on the scheduler driver.
+        //
         // TODO(vinod): Master should persist admitted frameworks to the
         // registry and remove them from it after failover timeout.
         validationError = Error("Framework has been removed");
@@ -1905,7 +1799,7 @@ void Master::reregisterFramework(
   }
 
   if (validationError.isSome()) {
-    LOG(INFO) << "Refusing re-registration of framework"
+    LOG(INFO) << "Refusing subscription of framework"
               << " '" << frameworkInfo.name() << "' at " << from << ": "
               << validationError.get().message;
 
@@ -1915,9 +1809,8 @@ void Master::reregisterFramework(
     return;
   }
 
-  LOG(INFO) << "Received re-registration request from framework "
-            << frameworkInfo.id() << " (" << frameworkInfo.name()
-            << ") at " << from;
+  LOG(INFO) << "Received SUBSCRIBE call for"
+            << " framework '" << frameworkInfo.name() << "' at " << from;
 
   // We allow an authenticated framework to not specify a principal
   // in FrameworkInfo but we'd prefer if it did so we log a WARNING
@@ -1930,20 +1823,20 @@ void Master::reregisterFramework(
 
   authorizeFramework(frameworkInfo)
     .onAny(defer(self(),
-                 &Master::_reregisterFramework,
+                 &Master::_subscribe,
                  from,
-                 frameworkInfo,
-                 failover,
+                 subscribe,
                  lambda::_1));
 }
 
 
-void Master::_reregisterFramework(
+void Master::_subscribe(
     const UPID& from,
-    const FrameworkInfo& frameworkInfo,
-    bool failover,
+    const scheduler::Call::Subscribe& subscribe,
     const Future<bool>& authorized)
 {
+  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
+
   CHECK(!authorized.isDiscarded());
 
   Option<Error> authorizationError = None();
@@ -1957,7 +1850,7 @@ void Master::_reregisterFramework(
   }
 
   if (authorizationError.isSome()) {
-    LOG(INFO) << "Refusing re-registration of framework"
+    LOG(INFO) << "Refusing subscription of framework"
               << " '" << frameworkInfo.name() << "' at " << from
               << ": " << authorizationError.get().message;
 
@@ -1969,28 +1862,59 @@ void Master::_reregisterFramework(
 
   // At this point, authentications errors will be due to
   // re-authentication during the authorization process,
-  // so we drop the re-registration. It is important to
-  // drop this because if this request is from a failing
-  // over framework (pid = from) we don't want to failover
-  // the already registered framework (pid = framework->pid).
+  // so we drop the subscription.
   Option<Error> authenticationError =
     validateFrameworkAuthentication(frameworkInfo, from);
 
   if (authenticationError.isSome()) {
-    LOG(INFO) << "Dropping re-registration request for framework"
+    LOG(INFO) << "Dropping SUBSCRIBE call for framework"
               << " '" << frameworkInfo.name() << "' at " << from
               << ": " << authenticationError.get().message;
     return;
   }
 
-  LOG(INFO) << "Re-registering framework " << frameworkInfo.id()
-            << " (" << frameworkInfo.name() << ") " << " at " << from
+  LOG(INFO) << "Subscribing framework " << frameworkInfo.name()
             << " with checkpointing "
             << (frameworkInfo.checkpoint() ? "enabled" : "disabled")
             << " and capabilities " << frameworkInfo.capabilities();
 
+  if (!frameworkInfo.has_id() || frameworkInfo.id().value().empty()) {
+    // If we are here the framework is subscribing for the first time.
+
+    // Check if this framework is already subscribed (because it retries).
+    foreachvalue (Framework* framework, frameworks.registered) {
+      if (framework->pid == from) {
+        LOG(INFO) << "Framework " << *framework
+                  << " already subscribed, resending acknowledgement";
+        FrameworkRegisteredMessage message;
+        message.mutable_framework_id()->MergeFrom(framework->id());
+        message.mutable_master_info()->MergeFrom(info_);
+        framework->send(message);
+        return;
+      }
+    }
+
+    // Assign a new FrameworkID.
+    FrameworkInfo frameworkInfo_ = frameworkInfo;
+    frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
+
+    Framework* framework = new Framework(this, frameworkInfo_, from);
+
+    addFramework(framework);
+
+    FrameworkRegisteredMessage message;
+    message.mutable_framework_id()->MergeFrom(framework->id());
+    message.mutable_master_info()->MergeFrom(info_);
+    framework->send(message);
+
+    return;
+  }
+
+  // If we are here framework has already been assigned an id.
+  CHECK(!frameworkInfo.id().value().empty());
+
   if (frameworks.registered.count(frameworkInfo.id()) > 0) {
-    // Using the "failover" of the scheduler allows us to keep a
+    // Using the "force" field of the scheduler allows us to keep a
     // scheduler that got partitioned but didn't die (in ZooKeeper
     // speak this means didn't lose their session) and then
     // eventually tried to connect to this master even though
@@ -2004,7 +1928,7 @@ void Master::_reregisterFramework(
       CHECK_NOTNULL(frameworks.registered[frameworkInfo.id()]);
 
     // Update the framework's info fields based on those passed during
-    // re-registration.
+    // subscription.
     LOG(INFO) << "Updating info for framework " << framework->id();
     framework->updateFrameworkInfo(frameworkInfo);
 
@@ -2012,21 +1936,20 @@ void Master::_reregisterFramework(
 
     framework->reregisteredTime = Clock::now();
 
-    if (failover) {
-      // We do not attempt to detect a duplicate re-registration
-      // message here because it is impossible to distinguish between
-      // a duplicate message, and a scheduler failover to the same
-      // pid, given the existing libprocess primitives (PID does not
-      // identify the libprocess Process instance).
-
+    if (subscribe.force()) {
       // TODO(benh): Should we check whether the new scheduler has
       // given us a different framework name, user name or executor
       // info?
+      // TODO(vinod): Now that the scheduler pid is unique we don't
+      // need to call 'failoverFramework()' if the pid hasn't changed
+      // (i.e., duplicate message). Instead we can just send the
+      // FrameworkReregisteredMessage back and activate the framework
+      // if necesssary.
       LOG(INFO) << "Framework " << *framework << " failed over";
       failoverFramework(framework, from);
     } else if (framework->pid != from) {
       LOG(ERROR)
-        << "Disallowing re-registration attempt of framework " << *framework
+        << "Disallowing subscription attempt of framework " << *framework
         << " because it is not expected from " << from;
       FrameworkErrorMessage message;
       message.set_message("Framework failed over");
@@ -2034,7 +1957,7 @@ void Master::_reregisterFramework(
       return;
     } else {
       LOG(INFO) << "Allowing framework " << *framework
-                << " to re-register with an already used id";
+                << " to subscribe with an already used id";
 
       // Remove any offers sent to this framework.
       // NOTE: We need to do this because the scheduler might have
@@ -2072,8 +1995,6 @@ void Master::_reregisterFramework(
     // any tasks it has that have been reported by reconnecting slaves.
     Framework* framework = new Framework(this, frameworkInfo, from);
 
-    // TODO(benh): Check for root submissions like above!
-
     // Add active tasks and executors to the framework.
     foreachvalue (Slave* slave, slaves.registered) {
       foreachvalue (Task* task, slave->tasks[framework->id()]) {
@@ -2114,24 +2035,6 @@ void Master::_reregisterFramework(
     message.set_pid(from);
     send(slave->pid, message);
   }
-
-  return;
-}
-
-
-void Master::subscribe(
-    const UPID& from,
-    const scheduler::Call::Subscribe& subscribe)
-{
-  const FrameworkInfo& frameworkInfo = subscribe.framework_info();
-
-  // TODO(vinod): Instead of calling '(re-)registerFramework()' from
-  // here refactor those methods to call 'subscribe()'.
-  if (!frameworkInfo.has_id() || frameworkInfo.id() == "") {
-    registerFramework(from, frameworkInfo);
-  } else {
-    reregisterFramework(from, frameworkInfo, subscribe.force());
-  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b8bec202/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index ea18c4e..cd0a5c8 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -571,19 +571,6 @@ protected:
       const std::vector<ExecutorInfo>& executors,
       const std::vector<Task>& tasks);
 
-  // 'registerFramework()' continuation.
-  void _registerFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo,
-      const process::Future<bool>& authorized);
-
-  // 'reregisterFramework()' continuation.
-  void _reregisterFramework(
-      const process::UPID& from,
-      const FrameworkInfo& frameworkInfo,
-      bool failover,
-      const process::Future<bool>& authorized);
-
   // Add a framework.
   void addFramework(Framework* framework);
 
@@ -714,6 +701,11 @@ private:
       const process::UPID& from,
       const scheduler::Call::Subscribe& subscribe);
 
+  void _subscribe(
+      const process::UPID& from,
+      const scheduler::Call::Subscribe& subscribe,
+      const process::Future<bool>& authorized);
+
   void accept(
       Framework* framework,
       const scheduler::Call::Accept& accept);

Reply via email to