Repository: mesos Updated Branches: refs/heads/master a5a131024 -> b32f1b6f3
Moved Framework struct down in master.hpp. Review: https://reviews.apache.org/r/36733 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b32f1b6f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b32f1b6f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b32f1b6f Branch: refs/heads/master Commit: b32f1b6f3915e3eee04992c816c86af3b165ea92 Parents: a5a1310 Author: Anand Mazumdar <[email protected]> Authored: Mon Jul 27 14:04:55 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Mon Jul 27 14:04:55 2015 -0700 ---------------------------------------------------------------------- src/master/master.hpp | 1582 ++++++++++++++++++++++---------------------- 1 file changed, 792 insertions(+), 790 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b32f1b6f/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 827d0d5..9f27f8b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -89,6 +89,8 @@ class Repairer; class SlaveObserver; struct BoundedRateLimiter; +struct Framework; +struct Role; struct Slave @@ -330,319 +332,6 @@ inline std::ostream& operator << (std::ostream& stream, const Slave& slave) } -// Information about a connected or completed framework. -// TODO(bmahler): Keeping the task and executor information in sync -// across the Slave and Framework structs is error prone! -struct Framework -{ - Framework(const FrameworkInfo& _info, - const process::UPID& _pid, - const process::Time& time = process::Clock::now()) - : info(_info), - pid(_pid), - connected(true), - active(true), - registeredTime(time), - reregisteredTime(time), - completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} - - ~Framework() {} - - Task* getTask(const TaskID& taskId) - { - if (tasks.count(taskId) > 0) { - return tasks[taskId]; - } else { - return NULL; - } - } - - void addTask(Task* task) - { - CHECK(!tasks.contains(task->task_id())) - << "Duplicate task " << task->task_id() - << " of framework " << task->framework_id(); - - tasks[task->task_id()] = task; - - if (!protobuf::isTerminalState(task->state())) { - totalUsedResources += task->resources(); - usedResources[task->slave_id()] += task->resources(); - } - } - - // Notification of task termination, for resource accounting. - // TODO(bmahler): This is a hack for performance. We need to - // maintain resource counters because computing task resources - // functionally for all tasks is expensive, for now. - void taskTerminated(Task* task) - { - CHECK(protobuf::isTerminalState(task->state())); - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); - - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } - } - - void addCompletedTask(const Task& task) - { - // TODO(adam-mesos): Check if completed task already exists. - completedTasks.push_back(std::shared_ptr<Task>(new Task(task))); - } - - void removeTask(Task* task) - { - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); - - if (!protobuf::isTerminalState(task->state())) { - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } - } - - addCompletedTask(*task); - - tasks.erase(task->task_id()); - } - - void addOffer(Offer* offer) - { - CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); - offers.insert(offer); - totalOfferedResources += offer->resources(); - offeredResources[offer->slave_id()] += offer->resources(); - } - - void removeOffer(Offer* offer) - { - CHECK(offers.find(offer) != offers.end()) - << "Unknown offer " << offer->id(); - - totalOfferedResources -= offer->resources(); - offeredResources[offer->slave_id()] -= offer->resources(); - if (offeredResources[offer->slave_id()].empty()) { - offeredResources.erase(offer->slave_id()); - } - - offers.erase(offer); - } - - bool hasExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - return executors.contains(slaveId) && - executors[slaveId].contains(executorId); - } - - void addExecutor(const SlaveID& slaveId, - const ExecutorInfo& executorInfo) - { - CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) - << "Duplicate executor " << executorInfo.executor_id() - << " on slave " << slaveId; - - executors[slaveId][executorInfo.executor_id()] = executorInfo; - totalUsedResources += executorInfo.resources(); - usedResources[slaveId] += executorInfo.resources(); - } - - void removeExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - CHECK(hasExecutor(slaveId, executorId)) - << "Unknown executor " << executorId - << " of framework " << id() - << " of slave " << slaveId; - - totalUsedResources -= executors[slaveId][executorId].resources(); - usedResources[slaveId] -= executors[slaveId][executorId].resources(); - if (usedResources[slaveId].empty()) { - usedResources.erase(slaveId); - } - - executors[slaveId].erase(executorId); - if (executors[slaveId].empty()) { - executors.erase(slaveId); - } - } - - const FrameworkID id() const { return info.id(); } - - // Update fields in 'info' using those in 'source'. Currently this - // only updates 'name', 'failover_timeout', 'hostname', and - // 'webui_url'. - void updateFrameworkInfo(const FrameworkInfo& source) - { - // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because - // of MESOS-2559. Once this is fixed we can 'CHECK' that we only - // merge 'info' from the same framework 'id'. - - // TODO(jmlvanre): Merge other fields as per design doc in - // MESOS-703. - - if (source.user() != info.user()) { - LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user() - << "' for framework " << id() << ". Check MESOS-703"; - } - - info.set_name(source.name()); - - if (source.has_failover_timeout()) { - info.set_failover_timeout(source.failover_timeout()); - } else { - info.clear_failover_timeout(); - } - - if (source.checkpoint() != info.checkpoint()) { - LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '" - << stringify(info.checkpoint()) << "' for framework " << id() - << ". Check MESOS-703"; - } - - if (source.role() != info.role()) { - LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role() - << "' for framework " << id() << ". Check MESOS-703"; - } - - if (source.has_hostname()) { - info.set_hostname(source.hostname()); - } else { - info.clear_hostname(); - } - - if (source.principal() != info.principal()) { - LOG(WARNING) << "Can not update FrameworkInfo.principal to '" - << info.principal() << "' for framework " << id() - << ". Check MESOS-703"; - } - - if (source.has_webui_url()) { - info.set_webui_url(source.webui_url()); - } else { - info.clear_webui_url(); - } - } - - FrameworkInfo info; - - process::UPID pid; - - // Framework becomes disconnected when the socket closes. - bool connected; - - // Framework becomes deactivated when it is disconnected or - // the master receives a DeactivateFrameworkMessage. - // No offers will be made to a deactivated framework. - bool active; - - process::Time registeredTime; - process::Time reregisteredTime; - process::Time unregisteredTime; - - // Tasks that have not yet been launched because they are currently - // being authorized. - hashmap<TaskID, TaskInfo> pendingTasks; - - hashmap<TaskID, Task*> tasks; - - // NOTE: We use a shared pointer for Task because clang doesn't like - // Boost's implementation of circular_buffer with Task (Boost - // attempts to do some memset's which are unsafe). - boost::circular_buffer<std::shared_ptr<Task>> completedTasks; - - hashset<Offer*> offers; // Active offers for framework. - - hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors; - - // NOTE: For the used and offered resources below, we keep the - // total as well as partitioned by SlaveID. - // We expose the total resources via the HTTP endpoint, and we - // keep a running total of the resources because looping over the - // slaves to sum the resources has led to perf issues (MESOS-1862). - // We keep the resources partitioned by SlaveID because non-scalar - // resources can be lost when summing them up across multiple - // slaves (MESOS-2373). - // - // Also note that keeping the totals is safe even though it yields - // incorrect results for non-scalar resources. - // (1) For overlapping set items / ranges across slaves, these - // will get added N times but only represented once. - // (2) When an initial subtraction occurs (N-1), the resource is - // no longer represented. (This is the source of the bug). - // (3) When any further subtractions occur (N-(1+M)), the - // Resources simply ignores the subtraction since there's - // nothing to remove, so this is safe for now. - - // TODO(mpark): Strip the non-scalar resources out of the totals - // in order to avoid reporting incorrect statistics (MESOS-2623). - - // Active task / executor resources. - Resources totalUsedResources; - hashmap<SlaveID, Resources> usedResources; - - // Offered resources. - Resources totalOfferedResources; - hashmap<SlaveID, Resources> offeredResources; - -private: - Framework(const Framework&); // No copying. - Framework& operator = (const Framework&); // No assigning. -}; - - -inline std::ostream& operator << ( - std::ostream& stream, - const Framework& framework) -{ - // TODO(vinod): Also log the hostname once FrameworkInfo is properly - // updated on framework failover (MESOS-1784). - return stream << framework.id() << " (" << framework.info.name() - << ") at " << framework.pid; -} - - -// Information about an active role. -struct Role -{ - explicit Role(const mesos::master::RoleInfo& _info) - : info(_info) {} - - void addFramework(Framework* framework) - { - frameworks[framework->id()] = framework; - } - - void removeFramework(Framework* framework) - { - frameworks.erase(framework->id()); - } - - Resources resources() const - { - Resources resources; - foreachvalue (Framework* framework, frameworks) { - resources += framework->totalUsedResources; - resources += framework->totalOfferedResources; - } - - return resources; - } - - mesos::master::RoleInfo info; - - hashmap<FrameworkID, Framework*> frameworks; -}; - - class Master : public ProtobufProcess<Master> { public: @@ -908,617 +597,930 @@ protected: // executors and recover the resources. void removeFramework(Slave* slave, Framework* framework); - void disconnect(Framework* framework); - void deactivate(Framework* framework); + void disconnect(Framework* framework); + void deactivate(Framework* framework); + + void disconnect(Slave* slave); + void deactivate(Slave* slave); + + // Add a slave. + void addSlave( + Slave* slave, + const std::vector<Archive::Framework>& completedFrameworks = + std::vector<Archive::Framework>()); + + // Remove the slave from the registrar. Called when the slave + // does not re-register in time after a master failover. + Nothing removeSlave(const Registry::Slave& slave); + + // Remove the slave from the registrar and from the master's state. + // + // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved. + void removeSlave( + Slave* slave, + const std::string& message, + Option<process::metrics::Counter> reason = None()); + + void _removeSlave( + const SlaveInfo& slaveInfo, + const std::vector<StatusUpdate>& updates, + const process::Future<bool>& removed, + const std::string& message, + Option<process::metrics::Counter> reason = None()); + + // Authorizes the task. + // Returns true if task is authorized. + // Returns false if task is not authorized. + // Returns failure for transient authorization failures. + process::Future<bool> authorizeTask( + const TaskInfo& task, + Framework* framework); + + // Add the task and its executor (if not already running) to the + // framework and slave. Returns the resources consumed as a result, + // which includes resources for the task and its executor + // (if not already running). + Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + + // Transitions the task, and recovers resources if the task becomes + // terminal. + void updateTask(Task* task, const StatusUpdate& update); + + // Removes the task. + void removeTask(Task* task); + + // Remove an executor and recover its resources. + void removeExecutor( + Slave* slave, + const FrameworkID& frameworkId, + const ExecutorID& executorId); + + // Updates slave's resources by applying the given operation. It + // also updates the allocator and sends a CheckpointResourcesMessage + // to the slave with slave's current checkpointed resources. + void applyOfferOperation( + Framework* framework, + Slave* slave, + const Offer::Operation& operation); + + // Forwards the update to the framework. + void forward( + const StatusUpdate& update, + const process::UPID& acknowledgee, + Framework* framework); + + // Remove an offer after specified timeout + void offerTimeout(const OfferID& offerId); + + // Remove an offer and optionally rescind the offer as well. + void removeOffer(Offer* offer, bool rescind = false); + + Framework* getFramework(const FrameworkID& frameworkId); + Offer* getOffer(const OfferID& offerId); + + FrameworkID newFrameworkId(); + OfferID newOfferId(); + SlaveID newSlaveId(); + + Option<Credentials> credentials; + +private: + void drop( + const process::UPID& from, + const scheduler::Call& call, + const std::string& message); + + void drop( + Framework* framework, + const Offer::Operation& operation, + const std::string& message); + + // Call handlers. + void receive( + const process::UPID& from, + const scheduler::Call& call); + + void subscribe( + const process::UPID& from, + const scheduler::Call::Subscribe& subscribe); + + void accept( + Framework* framework, + const scheduler::Call::Accept& accept); + + void _accept( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + const Resources& offeredResources, + const scheduler::Call::Accept& accept, + const process::Future<std::list<process::Future<bool>>>& authorizations); + + void decline( + Framework* framework, + const scheduler::Call::Decline& decline); + + void revive(Framework* framework); + + void kill( + Framework* framework, + const scheduler::Call::Kill& kill); + + void shutdown( + Framework* framework, + const scheduler::Call::Shutdown& shutdown); + + void acknowledge( + Framework* framework, + const scheduler::Call::Acknowledge& acknowledge); + + void reconcile( + Framework* framework, + const scheduler::Call::Reconcile& reconcile); + + void message( + Framework* framework, + const scheduler::Call::Message& message); + + void request( + Framework* framework, + const scheduler::Call::Request& request); + + bool elected() const + { + return leader.isSome() && leader.get() == info_; + } + + // Inner class used to namespace HTTP route handlers (see + // master/http.cpp for implementations). + class Http + { + public: + explicit Http(Master* _master) : master(_master) {} + + // Logs the request, route handlers can compose this with the + // desired request handler to get consistent request logging. + static void log(const process::http::Request& request); + + // /master/call + process::Future<process::http::Response> call( + const process::http::Request& request) const; + + // /master/health + process::Future<process::http::Response> health( + const process::http::Request& request) const; + + // /master/observe + process::Future<process::http::Response> observe( + const process::http::Request& request) const; + + // /master/redirect + process::Future<process::http::Response> redirect( + const process::http::Request& request) const; + + // /master/roles.json + process::Future<process::http::Response> roles( + const process::http::Request& request) const; + + // /master/teardown and /master/shutdown (deprecated). + process::Future<process::http::Response> teardown( + const process::http::Request& request) const; + + // /master/slaves + process::Future<process::http::Response> slaves( + const process::http::Request& request) const; + + // /master/state.json + process::Future<process::http::Response> state( + const process::http::Request& request) const; + + // /master/state-summary + process::Future<process::http::Response> stateSummary( + const process::http::Request& request) const; + + // /master/tasks.json + process::Future<process::http::Response> tasks( + const process::http::Request& request) const; + + const static std::string CALL_HELP; + const static std::string HEALTH_HELP; + const static std::string OBSERVE_HELP; + const static std::string REDIRECT_HELP; + const static std::string ROLES_HELP; + const static std::string TEARDOWN_HELP; + const static std::string SLAVES_HELP; + const static std::string STATE_HELP; + const static std::string STATESUMMARY_HELP; + const static std::string TASKS_HELP; + + private: + // Helper for doing authentication, returns the credential used if + // the authentication was successful (or none if no credentials + // have been given to the master), otherwise an Error. + Result<Credential> authenticate( + const process::http::Request& request) const; + + // Continuations. + process::Future<process::http::Response> _teardown( + const FrameworkID& id, + bool authorized = true) const; + + Master* master; + }; + + Master(const Master&); // No copying. + Master& operator = (const Master&); // No assigning. + + friend struct Metrics; + + // NOTE: Since 'getOffer' and 'slaves' are protected, + // we need to make the following functions friends. + friend Offer* validation::offer::getOffer( + Master* master, const OfferID& offerId); + + friend Slave* validation::offer::getSlave( + Master* master, const SlaveID& slaveId); + + const Flags flags; - void disconnect(Slave* slave); - void deactivate(Slave* slave); + Option<MasterInfo> leader; // Current leading master. - // Add a slave. - void addSlave( - Slave* slave, - const std::vector<Archive::Framework>& completedFrameworks = - std::vector<Archive::Framework>()); + mesos::master::allocator::Allocator* allocator; + WhitelistWatcher* whitelistWatcher; + Registrar* registrar; + Repairer* repairer; + Files* files; - // Remove the slave from the registrar. Called when the slave - // does not re-register in time after a master failover. - Nothing removeSlave(const Registry::Slave& slave); + MasterContender* contender; + MasterDetector* detector; - // Remove the slave from the registrar and from the master's state. - // - // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved. - void removeSlave( - Slave* slave, - const std::string& message, - Option<process::metrics::Counter> reason = None()); + const Option<Authorizer*> authorizer; - void _removeSlave( - const SlaveInfo& slaveInfo, - const std::vector<StatusUpdate>& updates, - const process::Future<bool>& removed, - const std::string& message, - Option<process::metrics::Counter> reason = None()); + MasterInfo info_; - // Authorizes the task. - // Returns true if task is authorized. - // Returns false if task is not authorized. - // Returns failure for transient authorization failures. - process::Future<bool> authorizeTask( - const TaskInfo& task, - Framework* framework); + // Indicates when recovery is complete. Recovery begins once the + // master is elected as a leader. + Option<process::Future<Nothing>> recovered; - // Add the task and its executor (if not already running) to the - // framework and slave. Returns the resources consumed as a result, - // which includes resources for the task and its executor - // (if not already running). - Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + struct Slaves + { + Slaves() : removed(MAX_REMOVED_SLAVES) {} - // Transitions the task, and recovers resources if the task becomes - // terminal. - void updateTask(Task* task, const StatusUpdate& update); + // Imposes a time limit for slaves that we recover from the + // registry to re-register with the master. + Option<process::Timer> recoveredTimer; - // Removes the task. - void removeTask(Task* task); + // Slaves that have been recovered from the registrar but have yet + // to re-register. We keep a "reregistrationTimer" above to ensure + // we remove these slaves if they do not re-register. + hashset<SlaveID> recovered; - // Remove an executor and recover its resources. - void removeExecutor( - Slave* slave, - const FrameworkID& frameworkId, - const ExecutorID& executorId); + // Slaves that are in the process of registering. + hashset<process::UPID> registering; - // Updates slave's resources by applying the given operation. It - // also updates the allocator and sends a CheckpointResourcesMessage - // to the slave with slave's current checkpointed resources. - void applyOfferOperation( - Framework* framework, - Slave* slave, - const Offer::Operation& operation); + // Only those slaves that are re-registering for the first time + // with this master. We must not answer questions related to + // these slaves until the registrar determines their fate. + hashset<SlaveID> reregistering; - // Forwards the update to the framework. - void forward( - const StatusUpdate& update, - const process::UPID& acknowledgee, - Framework* framework); + // Registered slaves are indexed by SlaveID and UPID. Note that + // iteration is supported but is exposed as iteration over a + // hashmap<SlaveID, Slave*> since it is tedious to convert + // the map's key/value iterator into a value iterator. + // + // TODO(bmahler): Consider pulling in boost's multi_index, + // or creating a simpler indexing abstraction in stout. + struct + { + bool contains(const SlaveID& slaveId) const + { + return ids.contains(slaveId); + } - // Remove an offer after specified timeout - void offerTimeout(const OfferID& offerId); + bool contains(const process::UPID& pid) const + { + return pids.contains(pid); + } - // Remove an offer and optionally rescind the offer as well. - void removeOffer(Offer* offer, bool rescind = false); + Slave* get(const SlaveID& slaveId) const + { + return ids.get(slaveId).getOrElse(NULL); + } - Framework* getFramework(const FrameworkID& frameworkId); - Offer* getOffer(const OfferID& offerId); + Slave* get(const process::UPID& pid) const + { + return pids.get(pid).getOrElse(NULL); + } - FrameworkID newFrameworkId(); - OfferID newOfferId(); - SlaveID newSlaveId(); + void put(Slave* slave) + { + CHECK_NOTNULL(slave); + ids[slave->id] = slave; + pids[slave->pid] = slave; + } - Option<Credentials> credentials; + void remove(Slave* slave) + { + CHECK_NOTNULL(slave); + ids.erase(slave->id); + pids.erase(slave->pid); + } -private: - void drop( - const process::UPID& from, - const scheduler::Call& call, - const std::string& message); + void clear() + { + ids.clear(); + pids.clear(); + } - void drop( - Framework* framework, - const Offer::Operation& operation, - const std::string& message); + size_t size() const { return ids.size(); } - // Call handlers. - void receive( - const process::UPID& from, - const scheduler::Call& call); + typedef hashmap<SlaveID, Slave*>::iterator iterator; + typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator; - void subscribe( - const process::UPID& from, - const scheduler::Call::Subscribe& subscribe); + iterator begin() { return ids.begin(); } + iterator end() { return ids.end(); } - void accept( - Framework* framework, - const scheduler::Call::Accept& accept); + const_iterator begin() const { return ids.begin(); } + const_iterator end() const { return ids.end(); } - void _accept( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& offeredResources, - const scheduler::Call::Accept& accept, - const process::Future<std::list<process::Future<bool>>>& authorizations); + private: + hashmap<SlaveID, Slave*> ids; + hashmap<process::UPID, Slave*> pids; + } registered; - void decline( - Framework* framework, - const scheduler::Call::Decline& decline); + // Slaves that are in the process of being removed from the + // registrar. Think of these as being partially removed: we must + // not answer questions related to these until they are removed + // from the registry. + hashset<SlaveID> removing; - void revive(Framework* framework); + // We track removed slaves to preserve the consistency + // semantics of the pre-registrar code when a non-strict registrar + // is being used. That is, if we remove a slave, we must make + // an effort to prevent it from (re-)registering, sending updates, + // etc. We keep a cache here to prevent this from growing in an + // unbounded manner. + // TODO(bmahler): Ideally we could use a cache with set semantics. + Cache<SlaveID, Nothing> removed; - void kill( - Framework* framework, - const scheduler::Call::Kill& kill); + // This rate limiter is used to limit the removal of slaves failing + // health checks. + // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is + // a wrapper around libprocess process which is thread safe. + Option<std::shared_ptr<process::RateLimiter>> limiter; - void shutdown( - Framework* framework, - const scheduler::Call::Shutdown& shutdown); + bool transitioning(const Option<SlaveID>& slaveId) + { + if (slaveId.isSome()) { + return recovered.contains(slaveId.get()) || + reregistering.contains(slaveId.get()) || + removing.contains(slaveId.get()); + } else { + return !recovered.empty() || + !reregistering.empty() || + !removing.empty(); + } + } + } slaves; - void acknowledge( - Framework* framework, - const scheduler::Call::Acknowledge& acknowledge); + struct Frameworks + { + Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {} - void reconcile( - Framework* framework, - const scheduler::Call::Reconcile& reconcile); + hashmap<FrameworkID, Framework*> registered; + boost::circular_buffer<std::shared_ptr<Framework>> completed; - void message( - Framework* framework, - const scheduler::Call::Message& message); + // Principals of frameworks keyed by PID. + // NOTE: Multiple PIDs can map to the same principal. The + // principal is None when the framework doesn't specify it. + // The differences between this map and 'authenticated' are: + // 1) This map only includes *registered* frameworks. The mapping + // is added when a framework (re-)registers. + // 2) This map includes unauthenticated frameworks (when Master + // allows them) if they have principals specified in + // FrameworkInfo. + hashmap<process::UPID, Option<std::string>> principals; - void request( - Framework* framework, - const scheduler::Call::Request& request); + // BoundedRateLimiters keyed by the framework principal. + // Like Metrics::Frameworks, all frameworks of the same principal + // are throttled together at a common rate limit. + hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; - bool elected() const - { - return leader.isSome() && leader.get() == info_; - } + // The default limiter is for frameworks not specified in + // 'flags.rate_limits'. + Option<process::Owned<BoundedRateLimiter>> defaultLimiter; + } frameworks; - // Inner class used to namespace HTTP route handlers (see - // master/http.cpp for implementations). - class Http - { - public: - explicit Http(Master* _master) : master(_master) {} + hashmap<OfferID, Offer*> offers; + hashmap<OfferID, process::Timer> offerTimers; - // Logs the request, route handlers can compose this with the - // desired request handler to get consistent request logging. - static void log(const process::http::Request& request); + hashmap<std::string, Role*> roles; - // /master/call - process::Future<process::http::Response> call( - const process::http::Request& request) const; + // Authenticator names as supplied via flags. + std::vector<std::string> authenticatorNames; - // /master/health - process::Future<process::http::Response> health( - const process::http::Request& request) const; + Option<Authenticator*> authenticator; - // /master/observe - process::Future<process::http::Response> observe( - const process::http::Request& request) const; + // Frameworks/slaves that are currently in the process of authentication. + // 'authenticating' future is completed when authenticator + // completes authentication. + // The future is removed from the map when master completes authentication. + hashmap<process::UPID, process::Future<Option<std::string>>> authenticating; - // /master/redirect - process::Future<process::http::Response> redirect( - const process::http::Request& request) const; + // Principals of authenticated frameworks/slaves keyed by PID. + hashmap<process::UPID, std::string> authenticated; - // /master/roles.json - process::Future<process::http::Response> roles( - const process::http::Request& request) const; + int64_t nextFrameworkId; // Used to give each framework a unique ID. + int64_t nextOfferId; // Used to give each slot offer a unique ID. + int64_t nextSlaveId; // Used to give each slave a unique ID. - // /master/teardown and /master/shutdown (deprecated). - process::Future<process::http::Response> teardown( - const process::http::Request& request) const; + // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is + // thread safe. + // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains + // copyable metric types only. + std::shared_ptr<Metrics> metrics; - // /master/slaves - process::Future<process::http::Response> slaves( - const process::http::Request& request) const; + // Gauge handlers. + double _uptime_secs() + { + return (process::Clock::now() - startTime).secs(); + } - // /master/state.json - process::Future<process::http::Response> state( - const process::http::Request& request) const; + double _elected() + { + return elected() ? 1 : 0; + } - // /master/state-summary - process::Future<process::http::Response> stateSummary( - const process::http::Request& request) const; + double _slaves_connected(); + double _slaves_disconnected(); + double _slaves_active(); + double _slaves_inactive(); - // /master/tasks.json - process::Future<process::http::Response> tasks( - const process::http::Request& request) const; + double _frameworks_connected(); + double _frameworks_disconnected(); + double _frameworks_active(); + double _frameworks_inactive(); - const static std::string CALL_HELP; - const static std::string HEALTH_HELP; - const static std::string OBSERVE_HELP; - const static std::string REDIRECT_HELP; - const static std::string ROLES_HELP; - const static std::string TEARDOWN_HELP; - const static std::string SLAVES_HELP; - const static std::string STATE_HELP; - const static std::string STATESUMMARY_HELP; - const static std::string TASKS_HELP; + double _outstanding_offers() + { + return offers.size(); + } - private: - // Helper for doing authentication, returns the credential used if - // the authentication was successful (or none if no credentials - // have been given to the master), otherwise an Error. - Result<Credential> authenticate( - const process::http::Request& request) const; + double _event_queue_messages() + { + return static_cast<double>(eventCount<process::MessageEvent>()); + } - // Continuations. - process::Future<process::http::Response> _teardown( - const FrameworkID& id, - bool authorized = true) const; + double _event_queue_dispatches() + { + return static_cast<double>(eventCount<process::DispatchEvent>()); + } - Master* master; - }; + double _event_queue_http_requests() + { + return static_cast<double>(eventCount<process::HttpEvent>()); + } - Master(const Master&); // No copying. - Master& operator = (const Master&); // No assigning. + double _tasks_staging(); + double _tasks_starting(); + double _tasks_running(); - friend struct Metrics; + double _resources_total(const std::string& name); + double _resources_used(const std::string& name); + double _resources_percent(const std::string& name); - // NOTE: Since 'getOffer' and 'slaves' are protected, - // we need to make the following functions friends. - friend Offer* validation::offer::getOffer( - Master* master, const OfferID& offerId); + double _resources_revocable_total(const std::string& name); + double _resources_revocable_used(const std::string& name); + double _resources_revocable_percent(const std::string& name); - friend Slave* validation::offer::getSlave( - Master* master, const SlaveID& slaveId); + process::Time startTime; // Start time used to calculate uptime. - const Flags flags; + Option<process::Time> electedTime; // Time when this master is elected. - Option<MasterInfo> leader; // Current leading master. + // Validates the framework including authorization. + // Returns None if the framework is valid. + // Returns Error if the framework is invalid. + // Returns Failure if authorization returns 'Failure'. + process::Future<Option<Error>> validate( + const FrameworkInfo& frameworkInfo, + const process::UPID& from); +}; - mesos::master::allocator::Allocator* allocator; - WhitelistWatcher* whitelistWatcher; - Registrar* registrar; - Repairer* repairer; - Files* files; - MasterContender* contender; - MasterDetector* detector; +// Implementation of slave admission Registrar operation. +class AdmitSlave : public Operation +{ +public: + explicit AdmitSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } - const Option<Authorizer*> authorizer; +protected: + virtual Try<bool> perform( + Registry* registry, + hashset<SlaveID>* slaveIDs, + bool strict) + { + // Check and see if this slave already exists. + if (slaveIDs->contains(info.id())) { + if (strict) { + return Error("Slave already admitted"); + } else { + return false; // No mutation. + } + } - MasterInfo info_; + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); + slave->mutable_info()->CopyFrom(info); + slaveIDs->insert(info.id()); + return true; // Mutation. + } - // Indicates when recovery is complete. Recovery begins once the - // master is elected as a leader. - Option<process::Future<Nothing>> recovered; +private: + const SlaveInfo info; +}; - struct Slaves - { - Slaves() : removed(MAX_REMOVED_SLAVES) {} - // Imposes a time limit for slaves that we recover from the - // registry to re-register with the master. - Option<process::Timer> recoveredTimer; +// Implementation of slave readmission Registrar operation. +class ReadmitSlave : public Operation +{ +public: + explicit ReadmitSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } - // Slaves that have been recovered from the registrar but have yet - // to re-register. We keep a "reregistrationTimer" above to ensure - // we remove these slaves if they do not re-register. - hashset<SlaveID> recovered; +protected: + virtual Try<bool> perform( + Registry* registry, + hashset<SlaveID>* slaveIDs, + bool strict) + { + if (slaveIDs->contains(info.id())) { + return false; // No mutation. + } - // Slaves that are in the process of registering. - hashset<process::UPID> registering; + if (strict) { + return Error("Slave not yet admitted"); + } else { + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); + slave->mutable_info()->CopyFrom(info); + slaveIDs->insert(info.id()); + return true; // Mutation. + } + } - // Only those slaves that are re-registering for the first time - // with this master. We must not answer questions related to - // these slaves until the registrar determines their fate. - hashset<SlaveID> reregistering; +private: + const SlaveInfo info; +}; - // Registered slaves are indexed by SlaveID and UPID. Note that - // iteration is supported but is exposed as iteration over a - // hashmap<SlaveID, Slave*> since it is tedious to convert - // the map's key/value iterator into a value iterator. - // - // TODO(bmahler): Consider pulling in boost's multi_index, - // or creating a simpler indexing abstraction in stout. - struct - { - bool contains(const SlaveID& slaveId) const - { - return ids.contains(slaveId); - } - bool contains(const process::UPID& pid) const - { - return pids.contains(pid); - } +// Implementation of slave removal Registrar operation. +class RemoveSlave : public Operation +{ +public: + explicit RemoveSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } - Slave* get(const SlaveID& slaveId) const - { - return ids.get(slaveId).getOrElse(NULL); +protected: + virtual Try<bool> perform( + Registry* registry, + hashset<SlaveID>* slaveIDs, + bool strict) + { + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + const Registry::Slave& slave = registry->slaves().slaves(i); + if (slave.info().id() == info.id()) { + registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1); + slaveIDs->erase(info.id()); + return true; // Mutation. } + } - Slave* get(const process::UPID& pid) const - { - return pids.get(pid).getOrElse(NULL); - } + if (strict) { + return Error("Slave not yet admitted"); + } else { + return false; // No mutation. + } + } - void put(Slave* slave) - { - CHECK_NOTNULL(slave); - ids[slave->id] = slave; - pids[slave->pid] = slave; - } +private: + const SlaveInfo info; +}; - void remove(Slave* slave) - { - CHECK_NOTNULL(slave); - ids.erase(slave->id); - pids.erase(slave->pid); - } - void clear() - { - ids.clear(); - pids.clear(); - } +// Information about a connected or completed framework. +// TODO(bmahler): Keeping the task and executor information in sync +// across the Slave and Framework structs is error prone! +struct Framework +{ + Framework(const FrameworkInfo& _info, + const process::UPID& _pid, + const process::Time& time = process::Clock::now()) + : info(_info), + pid(_pid), + connected(true), + active(true), + registeredTime(time), + reregisteredTime(time), + completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} - size_t size() const { return ids.size(); } + ~Framework() {} - typedef hashmap<SlaveID, Slave*>::iterator iterator; - typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator; + Task* getTask(const TaskID& taskId) + { + if (tasks.count(taskId) > 0) { + return tasks[taskId]; + } else { + return NULL; + } + } - iterator begin() { return ids.begin(); } - iterator end() { return ids.end(); } + void addTask(Task* task) + { + CHECK(!tasks.contains(task->task_id())) + << "Duplicate task " << task->task_id() + << " of framework " << task->framework_id(); - const_iterator begin() const { return ids.begin(); } - const_iterator end() const { return ids.end(); } + tasks[task->task_id()] = task; - private: - hashmap<SlaveID, Slave*> ids; - hashmap<process::UPID, Slave*> pids; - } registered; + if (!protobuf::isTerminalState(task->state())) { + totalUsedResources += task->resources(); + usedResources[task->slave_id()] += task->resources(); + } + } - // Slaves that are in the process of being removed from the - // registrar. Think of these as being partially removed: we must - // not answer questions related to these until they are removed - // from the registry. - hashset<SlaveID> removing; + // Notification of task termination, for resource accounting. + // TODO(bmahler): This is a hack for performance. We need to + // maintain resource counters because computing task resources + // functionally for all tasks is expensive, for now. + void taskTerminated(Task* task) + { + CHECK(protobuf::isTerminalState(task->state())); + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); - // We track removed slaves to preserve the consistency - // semantics of the pre-registrar code when a non-strict registrar - // is being used. That is, if we remove a slave, we must make - // an effort to prevent it from (re-)registering, sending updates, - // etc. We keep a cache here to prevent this from growing in an - // unbounded manner. - // TODO(bmahler): Ideally we could use a cache with set semantics. - Cache<SlaveID, Nothing> removed; + totalUsedResources -= task->resources(); + usedResources[task->slave_id()] -= task->resources(); + if (usedResources[task->slave_id()].empty()) { + usedResources.erase(task->slave_id()); + } + } - // This rate limiter is used to limit the removal of slaves failing - // health checks. - // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is - // a wrapper around libprocess process which is thread safe. - Option<std::shared_ptr<process::RateLimiter>> limiter; + void addCompletedTask(const Task& task) + { + // TODO(adam-mesos): Check if completed task already exists. + completedTasks.push_back(std::shared_ptr<Task>(new Task(task))); + } - bool transitioning(const Option<SlaveID>& slaveId) - { - if (slaveId.isSome()) { - return recovered.contains(slaveId.get()) || - reregistering.contains(slaveId.get()) || - removing.contains(slaveId.get()); - } else { - return !recovered.empty() || - !reregistering.empty() || - !removing.empty(); + void removeTask(Task* task) + { + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); + + if (!protobuf::isTerminalState(task->state())) { + totalUsedResources -= task->resources(); + usedResources[task->slave_id()] -= task->resources(); + if (usedResources[task->slave_id()].empty()) { + usedResources.erase(task->slave_id()); } } - } slaves; - struct Frameworks + addCompletedTask(*task); + + tasks.erase(task->task_id()); + } + + void addOffer(Offer* offer) { - Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {} + CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); + offers.insert(offer); + totalOfferedResources += offer->resources(); + offeredResources[offer->slave_id()] += offer->resources(); + } - hashmap<FrameworkID, Framework*> registered; - boost::circular_buffer<std::shared_ptr<Framework>> completed; + void removeOffer(Offer* offer) + { + CHECK(offers.find(offer) != offers.end()) + << "Unknown offer " << offer->id(); - // Principals of frameworks keyed by PID. - // NOTE: Multiple PIDs can map to the same principal. The - // principal is None when the framework doesn't specify it. - // The differences between this map and 'authenticated' are: - // 1) This map only includes *registered* frameworks. The mapping - // is added when a framework (re-)registers. - // 2) This map includes unauthenticated frameworks (when Master - // allows them) if they have principals specified in - // FrameworkInfo. - hashmap<process::UPID, Option<std::string>> principals; + totalOfferedResources -= offer->resources(); + offeredResources[offer->slave_id()] -= offer->resources(); + if (offeredResources[offer->slave_id()].empty()) { + offeredResources.erase(offer->slave_id()); + } - // BoundedRateLimiters keyed by the framework principal. - // Like Metrics::Frameworks, all frameworks of the same principal - // are throttled together at a common rate limit. - hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; + offers.erase(offer); + } - // The default limiter is for frameworks not specified in - // 'flags.rate_limits'. - Option<process::Owned<BoundedRateLimiter>> defaultLimiter; - } frameworks; + bool hasExecutor(const SlaveID& slaveId, + const ExecutorID& executorId) + { + return executors.contains(slaveId) && + executors[slaveId].contains(executorId); + } - hashmap<OfferID, Offer*> offers; - hashmap<OfferID, process::Timer> offerTimers; + void addExecutor(const SlaveID& slaveId, + const ExecutorInfo& executorInfo) + { + CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) + << "Duplicate executor " << executorInfo.executor_id() + << " on slave " << slaveId; - hashmap<std::string, Role*> roles; + executors[slaveId][executorInfo.executor_id()] = executorInfo; + totalUsedResources += executorInfo.resources(); + usedResources[slaveId] += executorInfo.resources(); + } - // Authenticator names as supplied via flags. - std::vector<std::string> authenticatorNames; + void removeExecutor(const SlaveID& slaveId, + const ExecutorID& executorId) + { + CHECK(hasExecutor(slaveId, executorId)) + << "Unknown executor " << executorId + << " of framework " << id() + << " of slave " << slaveId; - Option<Authenticator*> authenticator; + totalUsedResources -= executors[slaveId][executorId].resources(); + usedResources[slaveId] -= executors[slaveId][executorId].resources(); + if (usedResources[slaveId].empty()) { + usedResources.erase(slaveId); + } - // Frameworks/slaves that are currently in the process of authentication. - // 'authenticating' future is completed when authenticator - // completes authentication. - // The future is removed from the map when master completes authentication. - hashmap<process::UPID, process::Future<Option<std::string>>> authenticating; + executors[slaveId].erase(executorId); + if (executors[slaveId].empty()) { + executors.erase(slaveId); + } + } + + const FrameworkID id() const { return info.id(); } + + // Update fields in 'info' using those in 'source'. Currently this + // only updates 'name', 'failover_timeout', 'hostname', and + // 'webui_url'. + void updateFrameworkInfo(const FrameworkInfo& source) + { + // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because + // of MESOS-2559. Once this is fixed we can 'CHECK' that we only + // merge 'info' from the same framework 'id'. + + // TODO(jmlvanre): Merge other fields as per design doc in + // MESOS-703. - // Principals of authenticated frameworks/slaves keyed by PID. - hashmap<process::UPID, std::string> authenticated; + if (source.user() != info.user()) { + LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user() + << "' for framework " << id() << ". Check MESOS-703"; + } - int64_t nextFrameworkId; // Used to give each framework a unique ID. - int64_t nextOfferId; // Used to give each slot offer a unique ID. - int64_t nextSlaveId; // Used to give each slave a unique ID. + info.set_name(source.name()); - // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is - // thread safe. - // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains - // copyable metric types only. - std::shared_ptr<Metrics> metrics; + if (source.has_failover_timeout()) { + info.set_failover_timeout(source.failover_timeout()); + } else { + info.clear_failover_timeout(); + } - // Gauge handlers. - double _uptime_secs() - { - return (process::Clock::now() - startTime).secs(); - } + if (source.checkpoint() != info.checkpoint()) { + LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '" + << stringify(info.checkpoint()) << "' for framework " << id() + << ". Check MESOS-703"; + } - double _elected() - { - return elected() ? 1 : 0; - } + if (source.role() != info.role()) { + LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role() + << "' for framework " << id() << ". Check MESOS-703"; + } - double _slaves_connected(); - double _slaves_disconnected(); - double _slaves_active(); - double _slaves_inactive(); + if (source.has_hostname()) { + info.set_hostname(source.hostname()); + } else { + info.clear_hostname(); + } - double _frameworks_connected(); - double _frameworks_disconnected(); - double _frameworks_active(); - double _frameworks_inactive(); + if (source.principal() != info.principal()) { + LOG(WARNING) << "Can not update FrameworkInfo.principal to '" + << info.principal() << "' for framework " << id() + << ". Check MESOS-703"; + } - double _outstanding_offers() - { - return offers.size(); + if (source.has_webui_url()) { + info.set_webui_url(source.webui_url()); + } else { + info.clear_webui_url(); + } } - double _event_queue_messages() - { - return static_cast<double>(eventCount<process::MessageEvent>()); - } + FrameworkInfo info; - double _event_queue_dispatches() - { - return static_cast<double>(eventCount<process::DispatchEvent>()); - } + process::UPID pid; - double _event_queue_http_requests() - { - return static_cast<double>(eventCount<process::HttpEvent>()); - } + // Framework becomes disconnected when the socket closes. + bool connected; - double _tasks_staging(); - double _tasks_starting(); - double _tasks_running(); + // Framework becomes deactivated when it is disconnected or + // the master receives a DeactivateFrameworkMessage. + // No offers will be made to a deactivated framework. + bool active; - double _resources_total(const std::string& name); - double _resources_used(const std::string& name); - double _resources_percent(const std::string& name); + process::Time registeredTime; + process::Time reregisteredTime; + process::Time unregisteredTime; - double _resources_revocable_total(const std::string& name); - double _resources_revocable_used(const std::string& name); - double _resources_revocable_percent(const std::string& name); + // Tasks that have not yet been launched because they are currently + // being authorized. + hashmap<TaskID, TaskInfo> pendingTasks; - process::Time startTime; // Start time used to calculate uptime. + hashmap<TaskID, Task*> tasks; - Option<process::Time> electedTime; // Time when this master is elected. + // NOTE: We use a shared pointer for Task because clang doesn't like + // Boost's implementation of circular_buffer with Task (Boost + // attempts to do some memset's which are unsafe). + boost::circular_buffer<std::shared_ptr<Task>> completedTasks; - // Validates the framework including authorization. - // Returns None if the framework is valid. - // Returns Error if the framework is invalid. - // Returns Failure if authorization returns 'Failure'. - process::Future<Option<Error>> validate( - const FrameworkInfo& frameworkInfo, - const process::UPID& from); -}; + hashset<Offer*> offers; // Active offers for framework. + hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors; -// Implementation of slave admission Registrar operation. -class AdmitSlave : public Operation -{ -public: - explicit AdmitSlave(const SlaveInfo& _info) : info(_info) - { - CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; - } + // NOTE: For the used and offered resources below, we keep the + // total as well as partitioned by SlaveID. + // We expose the total resources via the HTTP endpoint, and we + // keep a running total of the resources because looping over the + // slaves to sum the resources has led to perf issues (MESOS-1862). + // We keep the resources partitioned by SlaveID because non-scalar + // resources can be lost when summing them up across multiple + // slaves (MESOS-2373). + // + // Also note that keeping the totals is safe even though it yields + // incorrect results for non-scalar resources. + // (1) For overlapping set items / ranges across slaves, these + // will get added N times but only represented once. + // (2) When an initial subtraction occurs (N-1), the resource is + // no longer represented. (This is the source of the bug). + // (3) When any further subtractions occur (N-(1+M)), the + // Resources simply ignores the subtraction since there's + // nothing to remove, so this is safe for now. -protected: - virtual Try<bool> perform( - Registry* registry, - hashset<SlaveID>* slaveIDs, - bool strict) - { - // Check and see if this slave already exists. - if (slaveIDs->contains(info.id())) { - if (strict) { - return Error("Slave already admitted"); - } else { - return false; // No mutation. - } - } + // TODO(mpark): Strip the non-scalar resources out of the totals + // in order to avoid reporting incorrect statistics (MESOS-2623). - Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); - slave->mutable_info()->CopyFrom(info); - slaveIDs->insert(info.id()); - return true; // Mutation. - } + // Active task / executor resources. + Resources totalUsedResources; + hashmap<SlaveID, Resources> usedResources; + + // Offered resources. + Resources totalOfferedResources; + hashmap<SlaveID, Resources> offeredResources; private: - const SlaveInfo info; + Framework(const Framework&); // No copying. + Framework& operator = (const Framework&); // No assigning. }; -// Implementation of slave readmission Registrar operation. -class ReadmitSlave : public Operation +inline std::ostream& operator << ( + std::ostream& stream, + const Framework& framework) { -public: - explicit ReadmitSlave(const SlaveInfo& _info) : info(_info) - { - CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; - } - -protected: - virtual Try<bool> perform( - Registry* registry, - hashset<SlaveID>* slaveIDs, - bool strict) - { - if (slaveIDs->contains(info.id())) { - return false; // No mutation. - } + // TODO(vinod): Also log the hostname once FrameworkInfo is properly + // updated on framework failover (MESOS-1784). + return stream << framework.id() << " (" << framework.info.name() + << ") at " << framework.pid; +} - if (strict) { - return Error("Slave not yet admitted"); - } else { - Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); - slave->mutable_info()->CopyFrom(info); - slaveIDs->insert(info.id()); - return true; // Mutation. - } - } -private: - const SlaveInfo info; -}; +// Information about an active role. +struct Role +{ + explicit Role(const mesos::master::RoleInfo& _info) + : info(_info) {} + void addFramework(Framework* framework) + { + frameworks[framework->id()] = framework; + } -// Implementation of slave removal Registrar operation. -class RemoveSlave : public Operation -{ -public: - explicit RemoveSlave(const SlaveInfo& _info) : info(_info) + void removeFramework(Framework* framework) { - CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + frameworks.erase(framework->id()); } -protected: - virtual Try<bool> perform( - Registry* registry, - hashset<SlaveID>* slaveIDs, - bool strict) + Resources resources() const { - for (int i = 0; i < registry->slaves().slaves().size(); i++) { - const Registry::Slave& slave = registry->slaves().slaves(i); - if (slave.info().id() == info.id()) { - registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1); - slaveIDs->erase(info.id()); - return true; // Mutation. - } + Resources resources; + foreachvalue (Framework* framework, frameworks) { + resources += framework->totalUsedResources; + resources += framework->totalOfferedResources; } - if (strict) { - return Error("Slave not yet admitted"); - } else { - return false; // No mutation. - } + return resources; } -private: - const SlaveInfo info; + mesos::master::RoleInfo info; + + hashmap<FrameworkID, Framework*> frameworks; }; } // namespace master {
