Repository: mesos
Updated Branches:
  refs/heads/master a5a131024 -> b32f1b6f3


Moved Framework struct down in master.hpp.

Review: https://reviews.apache.org/r/36733


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b32f1b6f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b32f1b6f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b32f1b6f

Branch: refs/heads/master
Commit: b32f1b6f3915e3eee04992c816c86af3b165ea92
Parents: a5a1310
Author: Anand Mazumdar <[email protected]>
Authored: Mon Jul 27 14:04:55 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Mon Jul 27 14:04:55 2015 -0700

----------------------------------------------------------------------
 src/master/master.hpp | 1582 ++++++++++++++++++++++----------------------
 1 file changed, 792 insertions(+), 790 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b32f1b6f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 827d0d5..9f27f8b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -89,6 +89,8 @@ class Repairer;
 class SlaveObserver;
 
 struct BoundedRateLimiter;
+struct Framework;
+struct Role;
 
 
 struct Slave
@@ -330,319 +332,6 @@ inline std::ostream& operator << (std::ostream& stream, 
const Slave& slave)
 }
 
 
-// Information about a connected or completed framework.
-// TODO(bmahler): Keeping the task and executor information in sync
-// across the Slave and Framework structs is error prone!
-struct Framework
-{
-  Framework(const FrameworkInfo& _info,
-            const process::UPID& _pid,
-            const process::Time& time = process::Clock::now())
-    : info(_info),
-      pid(_pid),
-      connected(true),
-      active(true),
-      registeredTime(time),
-      reregisteredTime(time),
-      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
-
-  ~Framework() {}
-
-  Task* getTask(const TaskID& taskId)
-  {
-    if (tasks.count(taskId) > 0) {
-      return tasks[taskId];
-    } else {
-      return NULL;
-    }
-  }
-
-  void addTask(Task* task)
-  {
-    CHECK(!tasks.contains(task->task_id()))
-      << "Duplicate task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    tasks[task->task_id()] = task;
-
-    if (!protobuf::isTerminalState(task->state())) {
-      totalUsedResources += task->resources();
-      usedResources[task->slave_id()] += task->resources();
-    }
-  }
-
-  // Notification of task termination, for resource accounting.
-  // TODO(bmahler): This is a hack for performance. We need to
-  // maintain resource counters because computing task resources
-  // functionally for all tasks is expensive, for now.
-  void taskTerminated(Task* task)
-  {
-    CHECK(protobuf::isTerminalState(task->state()));
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    totalUsedResources -= task->resources();
-    usedResources[task->slave_id()] -= task->resources();
-    if (usedResources[task->slave_id()].empty()) {
-      usedResources.erase(task->slave_id());
-    }
-  }
-
-  void addCompletedTask(const Task& task)
-  {
-    // TODO(adam-mesos): Check if completed task already exists.
-    completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
-  }
-
-  void removeTask(Task* task)
-  {
-    CHECK(tasks.contains(task->task_id()))
-      << "Unknown task " << task->task_id()
-      << " of framework " << task->framework_id();
-
-    if (!protobuf::isTerminalState(task->state())) {
-      totalUsedResources -= task->resources();
-      usedResources[task->slave_id()] -= task->resources();
-      if (usedResources[task->slave_id()].empty()) {
-        usedResources.erase(task->slave_id());
-      }
-    }
-
-    addCompletedTask(*task);
-
-    tasks.erase(task->task_id());
-  }
-
-  void addOffer(Offer* offer)
-  {
-    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
-    offers.insert(offer);
-    totalOfferedResources += offer->resources();
-    offeredResources[offer->slave_id()] += offer->resources();
-  }
-
-  void removeOffer(Offer* offer)
-  {
-    CHECK(offers.find(offer) != offers.end())
-      << "Unknown offer " << offer->id();
-
-    totalOfferedResources -= offer->resources();
-    offeredResources[offer->slave_id()] -= offer->resources();
-    if (offeredResources[offer->slave_id()].empty()) {
-      offeredResources.erase(offer->slave_id());
-    }
-
-    offers.erase(offer);
-  }
-
-  bool hasExecutor(const SlaveID& slaveId,
-                   const ExecutorID& executorId)
-  {
-    return executors.contains(slaveId) &&
-      executors[slaveId].contains(executorId);
-  }
-
-  void addExecutor(const SlaveID& slaveId,
-                   const ExecutorInfo& executorInfo)
-  {
-    CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
-      << "Duplicate executor " << executorInfo.executor_id()
-      << " on slave " << slaveId;
-
-    executors[slaveId][executorInfo.executor_id()] = executorInfo;
-    totalUsedResources += executorInfo.resources();
-    usedResources[slaveId] += executorInfo.resources();
-  }
-
-  void removeExecutor(const SlaveID& slaveId,
-                      const ExecutorID& executorId)
-  {
-    CHECK(hasExecutor(slaveId, executorId))
-      << "Unknown executor " << executorId
-      << " of framework " << id()
-      << " of slave " << slaveId;
-
-    totalUsedResources -= executors[slaveId][executorId].resources();
-    usedResources[slaveId] -= executors[slaveId][executorId].resources();
-    if (usedResources[slaveId].empty()) {
-      usedResources.erase(slaveId);
-    }
-
-    executors[slaveId].erase(executorId);
-    if (executors[slaveId].empty()) {
-      executors.erase(slaveId);
-    }
-  }
-
-  const FrameworkID id() const { return info.id(); }
-
-  // Update fields in 'info' using those in 'source'. Currently this
-  // only updates 'name', 'failover_timeout', 'hostname', and
-  // 'webui_url'.
-  void updateFrameworkInfo(const FrameworkInfo& source)
-  {
-    // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
-    // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
-    // merge 'info' from the same framework 'id'.
-
-    // TODO(jmlvanre): Merge other fields as per design doc in
-    // MESOS-703.
-
-    if (source.user() != info.user()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
-                   << "' for framework " << id() << ". Check MESOS-703";
-    }
-
-    info.set_name(source.name());
-
-    if (source.has_failover_timeout()) {
-      info.set_failover_timeout(source.failover_timeout());
-    } else {
-      info.clear_failover_timeout();
-    }
-
-    if (source.checkpoint() != info.checkpoint()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
-                   << stringify(info.checkpoint()) << "' for framework " << 
id()
-                   << ". Check MESOS-703";
-    }
-
-    if (source.role() != info.role()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
-                   << "' for framework " << id() << ". Check MESOS-703";
-    }
-
-    if (source.has_hostname()) {
-      info.set_hostname(source.hostname());
-    } else {
-      info.clear_hostname();
-    }
-
-    if (source.principal() != info.principal()) {
-      LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
-                   << info.principal() << "' for framework " << id()
-                   << ". Check MESOS-703";
-    }
-
-    if (source.has_webui_url()) {
-      info.set_webui_url(source.webui_url());
-    } else {
-      info.clear_webui_url();
-    }
-  }
-
-  FrameworkInfo info;
-
-  process::UPID pid;
-
-  // Framework becomes disconnected when the socket closes.
-  bool connected;
-
-  // Framework becomes deactivated when it is disconnected or
-  // the master receives a DeactivateFrameworkMessage.
-  // No offers will be made to a deactivated framework.
-  bool active;
-
-  process::Time registeredTime;
-  process::Time reregisteredTime;
-  process::Time unregisteredTime;
-
-  // Tasks that have not yet been launched because they are currently
-  // being authorized.
-  hashmap<TaskID, TaskInfo> pendingTasks;
-
-  hashmap<TaskID, Task*> tasks;
-
-  // NOTE: We use a shared pointer for Task because clang doesn't like
-  // Boost's implementation of circular_buffer with Task (Boost
-  // attempts to do some memset's which are unsafe).
-  boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
-
-  hashset<Offer*> offers; // Active offers for framework.
-
-  hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
-
-  // NOTE: For the used and offered resources below, we keep the
-  // total as well as partitioned by SlaveID.
-  // We expose the total resources via the HTTP endpoint, and we
-  // keep a running total of the resources because looping over the
-  // slaves to sum the resources has led to perf issues (MESOS-1862).
-  // We keep the resources partitioned by SlaveID because non-scalar
-  // resources can be lost when summing them up across multiple
-  // slaves (MESOS-2373).
-  //
-  // Also note that keeping the totals is safe even though it yields
-  // incorrect results for non-scalar resources.
-  //   (1) For overlapping set items / ranges across slaves, these
-  //       will get added N times but only represented once.
-  //   (2) When an initial subtraction occurs (N-1), the resource is
-  //       no longer represented. (This is the source of the bug).
-  //   (3) When any further subtractions occur (N-(1+M)), the
-  //       Resources simply ignores the subtraction since there's
-  //       nothing to remove, so this is safe for now.
-
-  // TODO(mpark): Strip the non-scalar resources out of the totals
-  // in order to avoid reporting incorrect statistics (MESOS-2623).
-
-  // Active task / executor resources.
-  Resources totalUsedResources;
-  hashmap<SlaveID, Resources> usedResources;
-
-  // Offered resources.
-  Resources totalOfferedResources;
-  hashmap<SlaveID, Resources> offeredResources;
-
-private:
-  Framework(const Framework&);              // No copying.
-  Framework& operator = (const Framework&); // No assigning.
-};
-
-
-inline std::ostream& operator << (
-    std::ostream& stream,
-    const Framework& framework)
-{
-  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
-  // updated on framework failover (MESOS-1784).
-  return stream << framework.id() << " (" << framework.info.name()
-                << ") at " << framework.pid;
-}
-
-
-// Information about an active role.
-struct Role
-{
-  explicit Role(const mesos::master::RoleInfo& _info)
-    : info(_info) {}
-
-  void addFramework(Framework* framework)
-  {
-    frameworks[framework->id()] = framework;
-  }
-
-  void removeFramework(Framework* framework)
-  {
-    frameworks.erase(framework->id());
-  }
-
-  Resources resources() const
-  {
-    Resources resources;
-    foreachvalue (Framework* framework, frameworks) {
-      resources += framework->totalUsedResources;
-      resources += framework->totalOfferedResources;
-    }
-
-    return resources;
-  }
-
-  mesos::master::RoleInfo info;
-
-  hashmap<FrameworkID, Framework*> frameworks;
-};
-
-
 class Master : public ProtobufProcess<Master>
 {
 public:
@@ -908,617 +597,930 @@ protected:
   // executors and recover the resources.
   void removeFramework(Slave* slave, Framework* framework);
 
-  void disconnect(Framework* framework);
-  void deactivate(Framework* framework);
+  void disconnect(Framework* framework);
+  void deactivate(Framework* framework);
+
+  void disconnect(Slave* slave);
+  void deactivate(Slave* slave);
+
+  // Add a slave.
+  void addSlave(
+      Slave* slave,
+      const std::vector<Archive::Framework>& completedFrameworks =
+        std::vector<Archive::Framework>());
+
+  // Remove the slave from the registrar. Called when the slave
+  // does not re-register in time after a master failover.
+  Nothing removeSlave(const Registry::Slave& slave);
+
+  // Remove the slave from the registrar and from the master's state.
+  //
+  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
+  void removeSlave(
+      Slave* slave,
+      const std::string& message,
+      Option<process::metrics::Counter> reason = None());
+
+  void _removeSlave(
+      const SlaveInfo& slaveInfo,
+      const std::vector<StatusUpdate>& updates,
+      const process::Future<bool>& removed,
+      const std::string& message,
+      Option<process::metrics::Counter> reason = None());
+
+  // Authorizes the task.
+  // Returns true if task is authorized.
+  // Returns false if task is not authorized.
+  // Returns failure for transient authorization failures.
+  process::Future<bool> authorizeTask(
+      const TaskInfo& task,
+      Framework* framework);
+
+  // Add the task and its executor (if not already running) to the
+  // framework and slave. Returns the resources consumed as a result,
+  // which includes resources for the task and its executor
+  // (if not already running).
+  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+
+  // Transitions the task, and recovers resources if the task becomes
+  // terminal.
+  void updateTask(Task* task, const StatusUpdate& update);
+
+  // Removes the task.
+  void removeTask(Task* task);
+
+  // Remove an executor and recover its resources.
+  void removeExecutor(
+      Slave* slave,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId);
+
+  // Updates slave's resources by applying the given operation. It
+  // also updates the allocator and sends a CheckpointResourcesMessage
+  // to the slave with slave's current checkpointed resources.
+  void applyOfferOperation(
+      Framework* framework,
+      Slave* slave,
+      const Offer::Operation& operation);
+
+  // Forwards the update to the framework.
+  void forward(
+      const StatusUpdate& update,
+      const process::UPID& acknowledgee,
+      Framework* framework);
+
+  // Remove an offer after specified timeout
+  void offerTimeout(const OfferID& offerId);
+
+  // Remove an offer and optionally rescind the offer as well.
+  void removeOffer(Offer* offer, bool rescind = false);
+
+  Framework* getFramework(const FrameworkID& frameworkId);
+  Offer* getOffer(const OfferID& offerId);
+
+  FrameworkID newFrameworkId();
+  OfferID newOfferId();
+  SlaveID newSlaveId();
+
+  Option<Credentials> credentials;
+
+private:
+  void drop(
+      const process::UPID& from,
+      const scheduler::Call& call,
+      const std::string& message);
+
+  void drop(
+      Framework* framework,
+      const Offer::Operation& operation,
+      const std::string& message);
+
+  // Call handlers.
+  void receive(
+      const process::UPID& from,
+      const scheduler::Call& call);
+
+  void subscribe(
+      const process::UPID& from,
+      const scheduler::Call::Subscribe& subscribe);
+
+  void accept(
+      Framework* framework,
+      const scheduler::Call::Accept& accept);
+
+  void _accept(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    const Resources& offeredResources,
+    const scheduler::Call::Accept& accept,
+    const process::Future<std::list<process::Future<bool>>>& authorizations);
+
+  void decline(
+      Framework* framework,
+      const scheduler::Call::Decline& decline);
+
+  void revive(Framework* framework);
+
+  void kill(
+      Framework* framework,
+      const scheduler::Call::Kill& kill);
+
+  void shutdown(
+      Framework* framework,
+      const scheduler::Call::Shutdown& shutdown);
+
+  void acknowledge(
+      Framework* framework,
+      const scheduler::Call::Acknowledge& acknowledge);
+
+  void reconcile(
+      Framework* framework,
+      const scheduler::Call::Reconcile& reconcile);
+
+  void message(
+      Framework* framework,
+      const scheduler::Call::Message& message);
+
+  void request(
+      Framework* framework,
+      const scheduler::Call::Request& request);
+
+  bool elected() const
+  {
+    return leader.isSome() && leader.get() == info_;
+  }
+
+  // Inner class used to namespace HTTP route handlers (see
+  // master/http.cpp for implementations).
+  class Http
+  {
+  public:
+    explicit Http(Master* _master) : master(_master) {}
+
+    // Logs the request, route handlers can compose this with the
+    // desired request handler to get consistent request logging.
+    static void log(const process::http::Request& request);
+
+    // /master/call
+    process::Future<process::http::Response> call(
+        const process::http::Request& request) const;
+
+    // /master/health
+    process::Future<process::http::Response> health(
+        const process::http::Request& request) const;
+
+    // /master/observe
+    process::Future<process::http::Response> observe(
+        const process::http::Request& request) const;
+
+    // /master/redirect
+    process::Future<process::http::Response> redirect(
+        const process::http::Request& request) const;
+
+    // /master/roles.json
+    process::Future<process::http::Response> roles(
+        const process::http::Request& request) const;
+
+    // /master/teardown and /master/shutdown (deprecated).
+    process::Future<process::http::Response> teardown(
+        const process::http::Request& request) const;
+
+    // /master/slaves
+    process::Future<process::http::Response> slaves(
+        const process::http::Request& request) const;
+
+    // /master/state.json
+    process::Future<process::http::Response> state(
+        const process::http::Request& request) const;
+
+    // /master/state-summary
+    process::Future<process::http::Response> stateSummary(
+        const process::http::Request& request) const;
+
+    // /master/tasks.json
+    process::Future<process::http::Response> tasks(
+        const process::http::Request& request) const;
+
+    const static std::string CALL_HELP;
+    const static std::string HEALTH_HELP;
+    const static std::string OBSERVE_HELP;
+    const static std::string REDIRECT_HELP;
+    const static std::string ROLES_HELP;
+    const static std::string TEARDOWN_HELP;
+    const static std::string SLAVES_HELP;
+    const static std::string STATE_HELP;
+    const static std::string STATESUMMARY_HELP;
+    const static std::string TASKS_HELP;
+
+  private:
+    // Helper for doing authentication, returns the credential used if
+    // the authentication was successful (or none if no credentials
+    // have been given to the master), otherwise an Error.
+    Result<Credential> authenticate(
+        const process::http::Request& request) const;
+
+    // Continuations.
+    process::Future<process::http::Response> _teardown(
+        const FrameworkID& id,
+        bool authorized = true) const;
+
+    Master* master;
+  };
+
+  Master(const Master&);              // No copying.
+  Master& operator = (const Master&); // No assigning.
+
+  friend struct Metrics;
+
+  // NOTE: Since 'getOffer' and 'slaves' are protected,
+  // we need to make the following functions friends.
+  friend Offer* validation::offer::getOffer(
+      Master* master, const OfferID& offerId);
+
+  friend Slave* validation::offer::getSlave(
+      Master* master, const SlaveID& slaveId);
+
+  const Flags flags;
 
-  void disconnect(Slave* slave);
-  void deactivate(Slave* slave);
+  Option<MasterInfo> leader; // Current leading master.
 
-  // Add a slave.
-  void addSlave(
-      Slave* slave,
-      const std::vector<Archive::Framework>& completedFrameworks =
-        std::vector<Archive::Framework>());
+  mesos::master::allocator::Allocator* allocator;
+  WhitelistWatcher* whitelistWatcher;
+  Registrar* registrar;
+  Repairer* repairer;
+  Files* files;
 
-  // Remove the slave from the registrar. Called when the slave
-  // does not re-register in time after a master failover.
-  Nothing removeSlave(const Registry::Slave& slave);
+  MasterContender* contender;
+  MasterDetector* detector;
 
-  // Remove the slave from the registrar and from the master's state.
-  //
-  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
-  void removeSlave(
-      Slave* slave,
-      const std::string& message,
-      Option<process::metrics::Counter> reason = None());
+  const Option<Authorizer*> authorizer;
 
-  void _removeSlave(
-      const SlaveInfo& slaveInfo,
-      const std::vector<StatusUpdate>& updates,
-      const process::Future<bool>& removed,
-      const std::string& message,
-      Option<process::metrics::Counter> reason = None());
+  MasterInfo info_;
 
-  // Authorizes the task.
-  // Returns true if task is authorized.
-  // Returns false if task is not authorized.
-  // Returns failure for transient authorization failures.
-  process::Future<bool> authorizeTask(
-      const TaskInfo& task,
-      Framework* framework);
+  // Indicates when recovery is complete. Recovery begins once the
+  // master is elected as a leader.
+  Option<process::Future<Nothing>> recovered;
 
-  // Add the task and its executor (if not already running) to the
-  // framework and slave. Returns the resources consumed as a result,
-  // which includes resources for the task and its executor
-  // (if not already running).
-  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
+  struct Slaves
+  {
+    Slaves() : removed(MAX_REMOVED_SLAVES) {}
 
-  // Transitions the task, and recovers resources if the task becomes
-  // terminal.
-  void updateTask(Task* task, const StatusUpdate& update);
+    // Imposes a time limit for slaves that we recover from the
+    // registry to re-register with the master.
+    Option<process::Timer> recoveredTimer;
 
-  // Removes the task.
-  void removeTask(Task* task);
+    // Slaves that have been recovered from the registrar but have yet
+    // to re-register. We keep a "reregistrationTimer" above to ensure
+    // we remove these slaves if they do not re-register.
+    hashset<SlaveID> recovered;
 
-  // Remove an executor and recover its resources.
-  void removeExecutor(
-      Slave* slave,
-      const FrameworkID& frameworkId,
-      const ExecutorID& executorId);
+    // Slaves that are in the process of registering.
+    hashset<process::UPID> registering;
 
-  // Updates slave's resources by applying the given operation. It
-  // also updates the allocator and sends a CheckpointResourcesMessage
-  // to the slave with slave's current checkpointed resources.
-  void applyOfferOperation(
-      Framework* framework,
-      Slave* slave,
-      const Offer::Operation& operation);
+    // Only those slaves that are re-registering for the first time
+    // with this master. We must not answer questions related to
+    // these slaves until the registrar determines their fate.
+    hashset<SlaveID> reregistering;
 
-  // Forwards the update to the framework.
-  void forward(
-      const StatusUpdate& update,
-      const process::UPID& acknowledgee,
-      Framework* framework);
+    // Registered slaves are indexed by SlaveID and UPID. Note that
+    // iteration is supported but is exposed as iteration over a
+    // hashmap<SlaveID, Slave*> since it is tedious to convert
+    // the map's key/value iterator into a value iterator.
+    //
+    // TODO(bmahler): Consider pulling in boost's multi_index,
+    // or creating a simpler indexing abstraction in stout.
+    struct
+    {
+      bool contains(const SlaveID& slaveId) const
+      {
+        return ids.contains(slaveId);
+      }
 
-  // Remove an offer after specified timeout
-  void offerTimeout(const OfferID& offerId);
+      bool contains(const process::UPID& pid) const
+      {
+        return pids.contains(pid);
+      }
 
-  // Remove an offer and optionally rescind the offer as well.
-  void removeOffer(Offer* offer, bool rescind = false);
+      Slave* get(const SlaveID& slaveId) const
+      {
+        return ids.get(slaveId).getOrElse(NULL);
+      }
 
-  Framework* getFramework(const FrameworkID& frameworkId);
-  Offer* getOffer(const OfferID& offerId);
+      Slave* get(const process::UPID& pid) const
+      {
+        return pids.get(pid).getOrElse(NULL);
+      }
 
-  FrameworkID newFrameworkId();
-  OfferID newOfferId();
-  SlaveID newSlaveId();
+      void put(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids[slave->id] = slave;
+        pids[slave->pid] = slave;
+      }
 
-  Option<Credentials> credentials;
+      void remove(Slave* slave)
+      {
+        CHECK_NOTNULL(slave);
+        ids.erase(slave->id);
+        pids.erase(slave->pid);
+      }
 
-private:
-  void drop(
-      const process::UPID& from,
-      const scheduler::Call& call,
-      const std::string& message);
+      void clear()
+      {
+        ids.clear();
+        pids.clear();
+      }
 
-  void drop(
-      Framework* framework,
-      const Offer::Operation& operation,
-      const std::string& message);
+      size_t size() const { return ids.size(); }
 
-  // Call handlers.
-  void receive(
-      const process::UPID& from,
-      const scheduler::Call& call);
+      typedef hashmap<SlaveID, Slave*>::iterator iterator;
+      typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
 
-  void subscribe(
-      const process::UPID& from,
-      const scheduler::Call::Subscribe& subscribe);
+      iterator begin() { return ids.begin(); }
+      iterator end()   { return ids.end();   }
 
-  void accept(
-      Framework* framework,
-      const scheduler::Call::Accept& accept);
+      const_iterator begin() const { return ids.begin(); }
+      const_iterator end()   const { return ids.end();   }
 
-  void _accept(
-    const FrameworkID& frameworkId,
-    const SlaveID& slaveId,
-    const Resources& offeredResources,
-    const scheduler::Call::Accept& accept,
-    const process::Future<std::list<process::Future<bool>>>& authorizations);
+    private:
+      hashmap<SlaveID, Slave*> ids;
+      hashmap<process::UPID, Slave*> pids;
+    } registered;
 
-  void decline(
-      Framework* framework,
-      const scheduler::Call::Decline& decline);
+    // Slaves that are in the process of being removed from the
+    // registrar. Think of these as being partially removed: we must
+    // not answer questions related to these until they are removed
+    // from the registry.
+    hashset<SlaveID> removing;
 
-  void revive(Framework* framework);
+    // We track removed slaves to preserve the consistency
+    // semantics of the pre-registrar code when a non-strict registrar
+    // is being used. That is, if we remove a slave, we must make
+    // an effort to prevent it from (re-)registering, sending updates,
+    // etc. We keep a cache here to prevent this from growing in an
+    // unbounded manner.
+    // TODO(bmahler): Ideally we could use a cache with set semantics.
+    Cache<SlaveID, Nothing> removed;
 
-  void kill(
-      Framework* framework,
-      const scheduler::Call::Kill& kill);
+    // This rate limiter is used to limit the removal of slaves failing
+    // health checks.
+    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
+    // a wrapper around libprocess process which is thread safe.
+    Option<std::shared_ptr<process::RateLimiter>> limiter;
 
-  void shutdown(
-      Framework* framework,
-      const scheduler::Call::Shutdown& shutdown);
+    bool transitioning(const Option<SlaveID>& slaveId)
+    {
+      if (slaveId.isSome()) {
+        return recovered.contains(slaveId.get()) ||
+               reregistering.contains(slaveId.get()) ||
+               removing.contains(slaveId.get());
+      } else {
+        return !recovered.empty() ||
+               !reregistering.empty() ||
+               !removing.empty();
+      }
+    }
+  } slaves;
 
-  void acknowledge(
-      Framework* framework,
-      const scheduler::Call::Acknowledge& acknowledge);
+  struct Frameworks
+  {
+    Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
 
-  void reconcile(
-      Framework* framework,
-      const scheduler::Call::Reconcile& reconcile);
+    hashmap<FrameworkID, Framework*> registered;
+    boost::circular_buffer<std::shared_ptr<Framework>> completed;
 
-  void message(
-      Framework* framework,
-      const scheduler::Call::Message& message);
+    // Principals of frameworks keyed by PID.
+    // NOTE: Multiple PIDs can map to the same principal. The
+    // principal is None when the framework doesn't specify it.
+    // The differences between this map and 'authenticated' are:
+    // 1) This map only includes *registered* frameworks. The mapping
+    //    is added when a framework (re-)registers.
+    // 2) This map includes unauthenticated frameworks (when Master
+    //    allows them) if they have principals specified in
+    //    FrameworkInfo.
+    hashmap<process::UPID, Option<std::string>> principals;
 
-  void request(
-      Framework* framework,
-      const scheduler::Call::Request& request);
+    // BoundedRateLimiters keyed by the framework principal.
+    // Like Metrics::Frameworks, all frameworks of the same principal
+    // are throttled together at a common rate limit.
+    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
 
-  bool elected() const
-  {
-    return leader.isSome() && leader.get() == info_;
-  }
+    // The default limiter is for frameworks not specified in
+    // 'flags.rate_limits'.
+    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
+  } frameworks;
 
-  // Inner class used to namespace HTTP route handlers (see
-  // master/http.cpp for implementations).
-  class Http
-  {
-  public:
-    explicit Http(Master* _master) : master(_master) {}
+  hashmap<OfferID, Offer*> offers;
+  hashmap<OfferID, process::Timer> offerTimers;
 
-    // Logs the request, route handlers can compose this with the
-    // desired request handler to get consistent request logging.
-    static void log(const process::http::Request& request);
+  hashmap<std::string, Role*> roles;
 
-    // /master/call
-    process::Future<process::http::Response> call(
-        const process::http::Request& request) const;
+  // Authenticator names as supplied via flags.
+  std::vector<std::string> authenticatorNames;
 
-    // /master/health
-    process::Future<process::http::Response> health(
-        const process::http::Request& request) const;
+  Option<Authenticator*> authenticator;
 
-    // /master/observe
-    process::Future<process::http::Response> observe(
-        const process::http::Request& request) const;
+  // Frameworks/slaves that are currently in the process of authentication.
+  // 'authenticating' future is completed when authenticator
+  // completes authentication.
+  // The future is removed from the map when master completes authentication.
+  hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
 
-    // /master/redirect
-    process::Future<process::http::Response> redirect(
-        const process::http::Request& request) const;
+  // Principals of authenticated frameworks/slaves keyed by PID.
+  hashmap<process::UPID, std::string> authenticated;
 
-    // /master/roles.json
-    process::Future<process::http::Response> roles(
-        const process::http::Request& request) const;
+  int64_t nextFrameworkId; // Used to give each framework a unique ID.
+  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
+  int64_t nextSlaveId;     // Used to give each slave a unique ID.
 
-    // /master/teardown and /master/shutdown (deprecated).
-    process::Future<process::http::Response> teardown(
-        const process::http::Request& request) const;
+  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
+  // thread safe.
+  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
+  // copyable metric types only.
+  std::shared_ptr<Metrics> metrics;
 
-    // /master/slaves
-    process::Future<process::http::Response> slaves(
-        const process::http::Request& request) const;
+  // Gauge handlers.
+  double _uptime_secs()
+  {
+    return (process::Clock::now() - startTime).secs();
+  }
 
-    // /master/state.json
-    process::Future<process::http::Response> state(
-        const process::http::Request& request) const;
+  double _elected()
+  {
+    return elected() ? 1 : 0;
+  }
 
-    // /master/state-summary
-    process::Future<process::http::Response> stateSummary(
-        const process::http::Request& request) const;
+  double _slaves_connected();
+  double _slaves_disconnected();
+  double _slaves_active();
+  double _slaves_inactive();
 
-    // /master/tasks.json
-    process::Future<process::http::Response> tasks(
-        const process::http::Request& request) const;
+  double _frameworks_connected();
+  double _frameworks_disconnected();
+  double _frameworks_active();
+  double _frameworks_inactive();
 
-    const static std::string CALL_HELP;
-    const static std::string HEALTH_HELP;
-    const static std::string OBSERVE_HELP;
-    const static std::string REDIRECT_HELP;
-    const static std::string ROLES_HELP;
-    const static std::string TEARDOWN_HELP;
-    const static std::string SLAVES_HELP;
-    const static std::string STATE_HELP;
-    const static std::string STATESUMMARY_HELP;
-    const static std::string TASKS_HELP;
+  double _outstanding_offers()
+  {
+    return offers.size();
+  }
 
-  private:
-    // Helper for doing authentication, returns the credential used if
-    // the authentication was successful (or none if no credentials
-    // have been given to the master), otherwise an Error.
-    Result<Credential> authenticate(
-        const process::http::Request& request) const;
+  double _event_queue_messages()
+  {
+    return static_cast<double>(eventCount<process::MessageEvent>());
+  }
 
-    // Continuations.
-    process::Future<process::http::Response> _teardown(
-        const FrameworkID& id,
-        bool authorized = true) const;
+  double _event_queue_dispatches()
+  {
+    return static_cast<double>(eventCount<process::DispatchEvent>());
+  }
 
-    Master* master;
-  };
+  double _event_queue_http_requests()
+  {
+    return static_cast<double>(eventCount<process::HttpEvent>());
+  }
 
-  Master(const Master&);              // No copying.
-  Master& operator = (const Master&); // No assigning.
+  double _tasks_staging();
+  double _tasks_starting();
+  double _tasks_running();
 
-  friend struct Metrics;
+  double _resources_total(const std::string& name);
+  double _resources_used(const std::string& name);
+  double _resources_percent(const std::string& name);
 
-  // NOTE: Since 'getOffer' and 'slaves' are protected,
-  // we need to make the following functions friends.
-  friend Offer* validation::offer::getOffer(
-      Master* master, const OfferID& offerId);
+  double _resources_revocable_total(const std::string& name);
+  double _resources_revocable_used(const std::string& name);
+  double _resources_revocable_percent(const std::string& name);
 
-  friend Slave* validation::offer::getSlave(
-      Master* master, const SlaveID& slaveId);
+  process::Time startTime; // Start time used to calculate uptime.
 
-  const Flags flags;
+  Option<process::Time> electedTime; // Time when this master is elected.
 
-  Option<MasterInfo> leader; // Current leading master.
+  // Validates the framework including authorization.
+  // Returns None if the framework is valid.
+  // Returns Error if the framework is invalid.
+  // Returns Failure if authorization returns 'Failure'.
+  process::Future<Option<Error>> validate(
+      const FrameworkInfo& frameworkInfo,
+      const process::UPID& from);
+};
 
-  mesos::master::allocator::Allocator* allocator;
-  WhitelistWatcher* whitelistWatcher;
-  Registrar* registrar;
-  Repairer* repairer;
-  Files* files;
 
-  MasterContender* contender;
-  MasterDetector* detector;
+// Implementation of slave admission Registrar operation.
+class AdmitSlave : public Operation
+{
+public:
+  explicit AdmitSlave(const SlaveInfo& _info) : info(_info)
+  {
+    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+  }
 
-  const Option<Authorizer*> authorizer;
+protected:
+  virtual Try<bool> perform(
+      Registry* registry,
+      hashset<SlaveID>* slaveIDs,
+      bool strict)
+  {
+    // Check and see if this slave already exists.
+    if (slaveIDs->contains(info.id())) {
+      if (strict) {
+        return Error("Slave already admitted");
+      } else {
+        return false; // No mutation.
+      }
+    }
 
-  MasterInfo info_;
+    Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
+    slave->mutable_info()->CopyFrom(info);
+    slaveIDs->insert(info.id());
+    return true; // Mutation.
+  }
 
-  // Indicates when recovery is complete. Recovery begins once the
-  // master is elected as a leader.
-  Option<process::Future<Nothing>> recovered;
+private:
+  const SlaveInfo info;
+};
 
-  struct Slaves
-  {
-    Slaves() : removed(MAX_REMOVED_SLAVES) {}
 
-    // Imposes a time limit for slaves that we recover from the
-    // registry to re-register with the master.
-    Option<process::Timer> recoveredTimer;
+// Implementation of slave readmission Registrar operation.
+class ReadmitSlave : public Operation
+{
+public:
+  explicit ReadmitSlave(const SlaveInfo& _info) : info(_info)
+  {
+    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+  }
 
-    // Slaves that have been recovered from the registrar but have yet
-    // to re-register. We keep a "reregistrationTimer" above to ensure
-    // we remove these slaves if they do not re-register.
-    hashset<SlaveID> recovered;
+protected:
+  virtual Try<bool> perform(
+      Registry* registry,
+      hashset<SlaveID>* slaveIDs,
+      bool strict)
+  {
+    if (slaveIDs->contains(info.id())) {
+      return false; // No mutation.
+    }
 
-    // Slaves that are in the process of registering.
-    hashset<process::UPID> registering;
+    if (strict) {
+      return Error("Slave not yet admitted");
+    } else {
+      Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
+      slave->mutable_info()->CopyFrom(info);
+      slaveIDs->insert(info.id());
+      return true; // Mutation.
+    }
+  }
 
-    // Only those slaves that are re-registering for the first time
-    // with this master. We must not answer questions related to
-    // these slaves until the registrar determines their fate.
-    hashset<SlaveID> reregistering;
+private:
+  const SlaveInfo info;
+};
 
-    // Registered slaves are indexed by SlaveID and UPID. Note that
-    // iteration is supported but is exposed as iteration over a
-    // hashmap<SlaveID, Slave*> since it is tedious to convert
-    // the map's key/value iterator into a value iterator.
-    //
-    // TODO(bmahler): Consider pulling in boost's multi_index,
-    // or creating a simpler indexing abstraction in stout.
-    struct
-    {
-      bool contains(const SlaveID& slaveId) const
-      {
-        return ids.contains(slaveId);
-      }
 
-      bool contains(const process::UPID& pid) const
-      {
-        return pids.contains(pid);
-      }
+// Implementation of slave removal Registrar operation.
+class RemoveSlave : public Operation
+{
+public:
+  explicit RemoveSlave(const SlaveInfo& _info) : info(_info)
+  {
+    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+  }
 
-      Slave* get(const SlaveID& slaveId) const
-      {
-        return ids.get(slaveId).getOrElse(NULL);
+protected:
+  virtual Try<bool> perform(
+      Registry* registry,
+      hashset<SlaveID>* slaveIDs,
+      bool strict)
+  {
+    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+      const Registry::Slave& slave = registry->slaves().slaves(i);
+      if (slave.info().id() == info.id()) {
+        registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1);
+        slaveIDs->erase(info.id());
+        return true; // Mutation.
       }
+    }
 
-      Slave* get(const process::UPID& pid) const
-      {
-        return pids.get(pid).getOrElse(NULL);
-      }
+    if (strict) {
+      return Error("Slave not yet admitted");
+    } else {
+      return false; // No mutation.
+    }
+  }
 
-      void put(Slave* slave)
-      {
-        CHECK_NOTNULL(slave);
-        ids[slave->id] = slave;
-        pids[slave->pid] = slave;
-      }
+private:
+  const SlaveInfo info;
+};
 
-      void remove(Slave* slave)
-      {
-        CHECK_NOTNULL(slave);
-        ids.erase(slave->id);
-        pids.erase(slave->pid);
-      }
 
-      void clear()
-      {
-        ids.clear();
-        pids.clear();
-      }
+// Information about a connected or completed framework.
+// TODO(bmahler): Keeping the task and executor information in sync
+// across the Slave and Framework structs is error prone!
+struct Framework
+{
+  Framework(const FrameworkInfo& _info,
+            const process::UPID& _pid,
+            const process::Time& time = process::Clock::now())
+    : info(_info),
+      pid(_pid),
+      connected(true),
+      active(true),
+      registeredTime(time),
+      reregisteredTime(time),
+      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
 
-      size_t size() const { return ids.size(); }
+  ~Framework() {}
 
-      typedef hashmap<SlaveID, Slave*>::iterator iterator;
-      typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
+  Task* getTask(const TaskID& taskId)
+  {
+    if (tasks.count(taskId) > 0) {
+      return tasks[taskId];
+    } else {
+      return NULL;
+    }
+  }
 
-      iterator begin() { return ids.begin(); }
-      iterator end()   { return ids.end();   }
+  void addTask(Task* task)
+  {
+    CHECK(!tasks.contains(task->task_id()))
+      << "Duplicate task " << task->task_id()
+      << " of framework " << task->framework_id();
 
-      const_iterator begin() const { return ids.begin(); }
-      const_iterator end()   const { return ids.end();   }
+    tasks[task->task_id()] = task;
 
-    private:
-      hashmap<SlaveID, Slave*> ids;
-      hashmap<process::UPID, Slave*> pids;
-    } registered;
+    if (!protobuf::isTerminalState(task->state())) {
+      totalUsedResources += task->resources();
+      usedResources[task->slave_id()] += task->resources();
+    }
+  }
 
-    // Slaves that are in the process of being removed from the
-    // registrar. Think of these as being partially removed: we must
-    // not answer questions related to these until they are removed
-    // from the registry.
-    hashset<SlaveID> removing;
+  // Notification of task termination, for resource accounting.
+  // TODO(bmahler): This is a hack for performance. We need to
+  // maintain resource counters because computing task resources
+  // functionally for all tasks is expensive, for now.
+  void taskTerminated(Task* task)
+  {
+    CHECK(protobuf::isTerminalState(task->state()));
+    CHECK(tasks.contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
 
-    // We track removed slaves to preserve the consistency
-    // semantics of the pre-registrar code when a non-strict registrar
-    // is being used. That is, if we remove a slave, we must make
-    // an effort to prevent it from (re-)registering, sending updates,
-    // etc. We keep a cache here to prevent this from growing in an
-    // unbounded manner.
-    // TODO(bmahler): Ideally we could use a cache with set semantics.
-    Cache<SlaveID, Nothing> removed;
+    totalUsedResources -= task->resources();
+    usedResources[task->slave_id()] -= task->resources();
+    if (usedResources[task->slave_id()].empty()) {
+      usedResources.erase(task->slave_id());
+    }
+  }
 
-    // This rate limiter is used to limit the removal of slaves failing
-    // health checks.
-    // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
-    // a wrapper around libprocess process which is thread safe.
-    Option<std::shared_ptr<process::RateLimiter>> limiter;
+  void addCompletedTask(const Task& task)
+  {
+    // TODO(adam-mesos): Check if completed task already exists.
+    completedTasks.push_back(std::shared_ptr<Task>(new Task(task)));
+  }
 
-    bool transitioning(const Option<SlaveID>& slaveId)
-    {
-      if (slaveId.isSome()) {
-        return recovered.contains(slaveId.get()) ||
-               reregistering.contains(slaveId.get()) ||
-               removing.contains(slaveId.get());
-      } else {
-        return !recovered.empty() ||
-               !reregistering.empty() ||
-               !removing.empty();
+  void removeTask(Task* task)
+  {
+    CHECK(tasks.contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
+
+    if (!protobuf::isTerminalState(task->state())) {
+      totalUsedResources -= task->resources();
+      usedResources[task->slave_id()] -= task->resources();
+      if (usedResources[task->slave_id()].empty()) {
+        usedResources.erase(task->slave_id());
       }
     }
-  } slaves;
 
-  struct Frameworks
+    addCompletedTask(*task);
+
+    tasks.erase(task->task_id());
+  }
+
+  void addOffer(Offer* offer)
   {
-    Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {}
+    CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+    offers.insert(offer);
+    totalOfferedResources += offer->resources();
+    offeredResources[offer->slave_id()] += offer->resources();
+  }
 
-    hashmap<FrameworkID, Framework*> registered;
-    boost::circular_buffer<std::shared_ptr<Framework>> completed;
+  void removeOffer(Offer* offer)
+  {
+    CHECK(offers.find(offer) != offers.end())
+      << "Unknown offer " << offer->id();
 
-    // Principals of frameworks keyed by PID.
-    // NOTE: Multiple PIDs can map to the same principal. The
-    // principal is None when the framework doesn't specify it.
-    // The differences between this map and 'authenticated' are:
-    // 1) This map only includes *registered* frameworks. The mapping
-    //    is added when a framework (re-)registers.
-    // 2) This map includes unauthenticated frameworks (when Master
-    //    allows them) if they have principals specified in
-    //    FrameworkInfo.
-    hashmap<process::UPID, Option<std::string>> principals;
+    totalOfferedResources -= offer->resources();
+    offeredResources[offer->slave_id()] -= offer->resources();
+    if (offeredResources[offer->slave_id()].empty()) {
+      offeredResources.erase(offer->slave_id());
+    }
 
-    // BoundedRateLimiters keyed by the framework principal.
-    // Like Metrics::Frameworks, all frameworks of the same principal
-    // are throttled together at a common rate limit.
-    hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters;
+    offers.erase(offer);
+  }
 
-    // The default limiter is for frameworks not specified in
-    // 'flags.rate_limits'.
-    Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
-  } frameworks;
+  bool hasExecutor(const SlaveID& slaveId,
+                   const ExecutorID& executorId)
+  {
+    return executors.contains(slaveId) &&
+      executors[slaveId].contains(executorId);
+  }
 
-  hashmap<OfferID, Offer*> offers;
-  hashmap<OfferID, process::Timer> offerTimers;
+  void addExecutor(const SlaveID& slaveId,
+                   const ExecutorInfo& executorInfo)
+  {
+    CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
+      << "Duplicate executor " << executorInfo.executor_id()
+      << " on slave " << slaveId;
 
-  hashmap<std::string, Role*> roles;
+    executors[slaveId][executorInfo.executor_id()] = executorInfo;
+    totalUsedResources += executorInfo.resources();
+    usedResources[slaveId] += executorInfo.resources();
+  }
 
-  // Authenticator names as supplied via flags.
-  std::vector<std::string> authenticatorNames;
+  void removeExecutor(const SlaveID& slaveId,
+                      const ExecutorID& executorId)
+  {
+    CHECK(hasExecutor(slaveId, executorId))
+      << "Unknown executor " << executorId
+      << " of framework " << id()
+      << " of slave " << slaveId;
 
-  Option<Authenticator*> authenticator;
+    totalUsedResources -= executors[slaveId][executorId].resources();
+    usedResources[slaveId] -= executors[slaveId][executorId].resources();
+    if (usedResources[slaveId].empty()) {
+      usedResources.erase(slaveId);
+    }
 
-  // Frameworks/slaves that are currently in the process of authentication.
-  // 'authenticating' future is completed when authenticator
-  // completes authentication.
-  // The future is removed from the map when master completes authentication.
-  hashmap<process::UPID, process::Future<Option<std::string>>> authenticating;
+    executors[slaveId].erase(executorId);
+    if (executors[slaveId].empty()) {
+      executors.erase(slaveId);
+    }
+  }
+
+  const FrameworkID id() const { return info.id(); }
+
+  // Update fields in 'info' using those in 'source'. Currently this
+  // only updates 'name', 'failover_timeout', 'hostname', and
+  // 'webui_url'.
+  void updateFrameworkInfo(const FrameworkInfo& source)
+  {
+    // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because
+    // of MESOS-2559. Once this is fixed we can 'CHECK' that we only
+    // merge 'info' from the same framework 'id'.
+
+    // TODO(jmlvanre): Merge other fields as per design doc in
+    // MESOS-703.
 
-  // Principals of authenticated frameworks/slaves keyed by PID.
-  hashmap<process::UPID, std::string> authenticated;
+    if (source.user() != info.user()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user()
+                   << "' for framework " << id() << ". Check MESOS-703";
+    }
 
-  int64_t nextFrameworkId; // Used to give each framework a unique ID.
-  int64_t nextOfferId;     // Used to give each slot offer a unique ID.
-  int64_t nextSlaveId;     // Used to give each slave a unique ID.
+    info.set_name(source.name());
 
-  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
-  // thread safe.
-  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
-  // copyable metric types only.
-  std::shared_ptr<Metrics> metrics;
+    if (source.has_failover_timeout()) {
+      info.set_failover_timeout(source.failover_timeout());
+    } else {
+      info.clear_failover_timeout();
+    }
 
-  // Gauge handlers.
-  double _uptime_secs()
-  {
-    return (process::Clock::now() - startTime).secs();
-  }
+    if (source.checkpoint() != info.checkpoint()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '"
+                   << stringify(info.checkpoint()) << "' for framework " << 
id()
+                   << ". Check MESOS-703";
+    }
 
-  double _elected()
-  {
-    return elected() ? 1 : 0;
-  }
+    if (source.role() != info.role()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role()
+                   << "' for framework " << id() << ". Check MESOS-703";
+    }
 
-  double _slaves_connected();
-  double _slaves_disconnected();
-  double _slaves_active();
-  double _slaves_inactive();
+    if (source.has_hostname()) {
+      info.set_hostname(source.hostname());
+    } else {
+      info.clear_hostname();
+    }
 
-  double _frameworks_connected();
-  double _frameworks_disconnected();
-  double _frameworks_active();
-  double _frameworks_inactive();
+    if (source.principal() != info.principal()) {
+      LOG(WARNING) << "Can not update FrameworkInfo.principal to '"
+                   << info.principal() << "' for framework " << id()
+                   << ". Check MESOS-703";
+    }
 
-  double _outstanding_offers()
-  {
-    return offers.size();
+    if (source.has_webui_url()) {
+      info.set_webui_url(source.webui_url());
+    } else {
+      info.clear_webui_url();
+    }
   }
 
-  double _event_queue_messages()
-  {
-    return static_cast<double>(eventCount<process::MessageEvent>());
-  }
+  FrameworkInfo info;
 
-  double _event_queue_dispatches()
-  {
-    return static_cast<double>(eventCount<process::DispatchEvent>());
-  }
+  process::UPID pid;
 
-  double _event_queue_http_requests()
-  {
-    return static_cast<double>(eventCount<process::HttpEvent>());
-  }
+  // Framework becomes disconnected when the socket closes.
+  bool connected;
 
-  double _tasks_staging();
-  double _tasks_starting();
-  double _tasks_running();
+  // Framework becomes deactivated when it is disconnected or
+  // the master receives a DeactivateFrameworkMessage.
+  // No offers will be made to a deactivated framework.
+  bool active;
 
-  double _resources_total(const std::string& name);
-  double _resources_used(const std::string& name);
-  double _resources_percent(const std::string& name);
+  process::Time registeredTime;
+  process::Time reregisteredTime;
+  process::Time unregisteredTime;
 
-  double _resources_revocable_total(const std::string& name);
-  double _resources_revocable_used(const std::string& name);
-  double _resources_revocable_percent(const std::string& name);
+  // Tasks that have not yet been launched because they are currently
+  // being authorized.
+  hashmap<TaskID, TaskInfo> pendingTasks;
 
-  process::Time startTime; // Start time used to calculate uptime.
+  hashmap<TaskID, Task*> tasks;
 
-  Option<process::Time> electedTime; // Time when this master is elected.
+  // NOTE: We use a shared pointer for Task because clang doesn't like
+  // Boost's implementation of circular_buffer with Task (Boost
+  // attempts to do some memset's which are unsafe).
+  boost::circular_buffer<std::shared_ptr<Task>> completedTasks;
 
-  // Validates the framework including authorization.
-  // Returns None if the framework is valid.
-  // Returns Error if the framework is invalid.
-  // Returns Failure if authorization returns 'Failure'.
-  process::Future<Option<Error>> validate(
-      const FrameworkInfo& frameworkInfo,
-      const process::UPID& from);
-};
+  hashset<Offer*> offers; // Active offers for framework.
 
+  hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
-// Implementation of slave admission Registrar operation.
-class AdmitSlave : public Operation
-{
-public:
-  explicit AdmitSlave(const SlaveInfo& _info) : info(_info)
-  {
-    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
-  }
+  // NOTE: For the used and offered resources below, we keep the
+  // total as well as partitioned by SlaveID.
+  // We expose the total resources via the HTTP endpoint, and we
+  // keep a running total of the resources because looping over the
+  // slaves to sum the resources has led to perf issues (MESOS-1862).
+  // We keep the resources partitioned by SlaveID because non-scalar
+  // resources can be lost when summing them up across multiple
+  // slaves (MESOS-2373).
+  //
+  // Also note that keeping the totals is safe even though it yields
+  // incorrect results for non-scalar resources.
+  //   (1) For overlapping set items / ranges across slaves, these
+  //       will get added N times but only represented once.
+  //   (2) When an initial subtraction occurs (N-1), the resource is
+  //       no longer represented. (This is the source of the bug).
+  //   (3) When any further subtractions occur (N-(1+M)), the
+  //       Resources simply ignores the subtraction since there's
+  //       nothing to remove, so this is safe for now.
 
-protected:
-  virtual Try<bool> perform(
-      Registry* registry,
-      hashset<SlaveID>* slaveIDs,
-      bool strict)
-  {
-    // Check and see if this slave already exists.
-    if (slaveIDs->contains(info.id())) {
-      if (strict) {
-        return Error("Slave already admitted");
-      } else {
-        return false; // No mutation.
-      }
-    }
+  // TODO(mpark): Strip the non-scalar resources out of the totals
+  // in order to avoid reporting incorrect statistics (MESOS-2623).
 
-    Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
-    slave->mutable_info()->CopyFrom(info);
-    slaveIDs->insert(info.id());
-    return true; // Mutation.
-  }
+  // Active task / executor resources.
+  Resources totalUsedResources;
+  hashmap<SlaveID, Resources> usedResources;
+
+  // Offered resources.
+  Resources totalOfferedResources;
+  hashmap<SlaveID, Resources> offeredResources;
 
 private:
-  const SlaveInfo info;
+  Framework(const Framework&);              // No copying.
+  Framework& operator = (const Framework&); // No assigning.
 };
 
 
-// Implementation of slave readmission Registrar operation.
-class ReadmitSlave : public Operation
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const Framework& framework)
 {
-public:
-  explicit ReadmitSlave(const SlaveInfo& _info) : info(_info)
-  {
-    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
-  }
-
-protected:
-  virtual Try<bool> perform(
-      Registry* registry,
-      hashset<SlaveID>* slaveIDs,
-      bool strict)
-  {
-    if (slaveIDs->contains(info.id())) {
-      return false; // No mutation.
-    }
+  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
+  // updated on framework failover (MESOS-1784).
+  return stream << framework.id() << " (" << framework.info.name()
+                << ") at " << framework.pid;
+}
 
-    if (strict) {
-      return Error("Slave not yet admitted");
-    } else {
-      Registry::Slave* slave = registry->mutable_slaves()->add_slaves();
-      slave->mutable_info()->CopyFrom(info);
-      slaveIDs->insert(info.id());
-      return true; // Mutation.
-    }
-  }
 
-private:
-  const SlaveInfo info;
-};
+// Information about an active role.
+struct Role
+{
+  explicit Role(const mesos::master::RoleInfo& _info)
+    : info(_info) {}
 
+  void addFramework(Framework* framework)
+  {
+    frameworks[framework->id()] = framework;
+  }
 
-// Implementation of slave removal Registrar operation.
-class RemoveSlave : public Operation
-{
-public:
-  explicit RemoveSlave(const SlaveInfo& _info) : info(_info)
+  void removeFramework(Framework* framework)
   {
-    CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field";
+    frameworks.erase(framework->id());
   }
 
-protected:
-  virtual Try<bool> perform(
-      Registry* registry,
-      hashset<SlaveID>* slaveIDs,
-      bool strict)
+  Resources resources() const
   {
-    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
-      const Registry::Slave& slave = registry->slaves().slaves(i);
-      if (slave.info().id() == info.id()) {
-        registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1);
-        slaveIDs->erase(info.id());
-        return true; // Mutation.
-      }
+    Resources resources;
+    foreachvalue (Framework* framework, frameworks) {
+      resources += framework->totalUsedResources;
+      resources += framework->totalOfferedResources;
     }
 
-    if (strict) {
-      return Error("Slave not yet admitted");
-    } else {
-      return false; // No mutation.
-    }
+    return resources;
   }
 
-private:
-  const SlaveInfo info;
+  mesos::master::RoleInfo info;
+
+  hashmap<FrameworkID, Framework*> frameworks;
 };
 
 } // namespace master {

Reply via email to