Moved up Slave and Framework structs in master.hpp. Review: https://reviews.apache.org/r/34387
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b19ffd2f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b19ffd2f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b19ffd2f Branch: refs/heads/master Commit: b19ffd2fe72d50fbd9f62c4c32eaa0906b2bd177 Parents: 26091f4 Author: Benjamin Mahler <[email protected]> Authored: Mon May 18 18:17:59 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue May 19 11:55:30 2015 -0700 ---------------------------------------------------------------------- src/master/master.hpp | 1951 ++++++++++++++++++++++---------------------- 1 file changed, 974 insertions(+), 977 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b19ffd2f/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index da0a835..0922a7c 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -89,1228 +89,1225 @@ class Repairer; class SlaveObserver; struct BoundedRateLimiter; -struct Framework; -struct Role; -struct Slave; -class Master : public ProtobufProcess<Master> +struct Slave { -public: - Master(mesos::master::allocator::Allocator* allocator, - Registrar* registrar, - Repairer* repairer, - Files* files, - MasterContender* contender, - MasterDetector* detector, - const Option<Authorizer*>& authorizer, - const Option<std::shared_ptr<process::RateLimiter>>& - slaveRemovalLimiter, - const Flags& flags = Flags()); - - virtual ~Master(); - - // Message handlers. - void submitScheduler( - const std::string& name); - - void registerFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo); - - void reregisterFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo, - bool failover); - - void unregisterFramework( - const process::UPID& from, - const FrameworkID& frameworkId); - - void deactivateFramework( - const process::UPID& from, - const FrameworkID& frameworkId); - - // TODO(vinod): Remove this once the old driver is removed. - void resourceRequest( - const process::UPID& from, - const FrameworkID& frameworkId, - const std::vector<Request>& requests); - - void launchTasks( - const process::UPID& from, - const FrameworkID& frameworkId, - const std::vector<TaskInfo>& tasks, - const Filters& filters, - const std::vector<OfferID>& offerIds); - - void reviveOffers( - const process::UPID& from, - const FrameworkID& frameworkId); - - void killTask( - const process::UPID& from, - const FrameworkID& frameworkId, - const TaskID& taskId); - - void statusUpdateAcknowledgement( - const process::UPID& from, - const SlaveID& slaveId, - const FrameworkID& frameworkId, - const TaskID& taskId, - const std::string& uuid); - - void schedulerMessage( - const process::UPID& from, - const SlaveID& slaveId, - const FrameworkID& frameworkId, - const ExecutorID& executorId, - const std::string& data); - - void registerSlave( - const process::UPID& from, - const SlaveInfo& slaveInfo, - const std::vector<Resource>& checkpointedResources, - const std::string& version); - - void reregisterSlave( - const process::UPID& from, - const SlaveInfo& slaveInfo, - const std::vector<Resource>& checkpointedResources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<Archive::Framework>& completedFrameworks, - const std::string& version); - - void unregisterSlave( - const process::UPID& from, - const SlaveID& slaveId); - - void statusUpdate( - const StatusUpdate& update, - const process::UPID& pid); - - void reconcileTasks( - const process::UPID& from, - const FrameworkID& frameworkId, - const std::vector<TaskStatus>& statuses); - - void exitedExecutor( - const process::UPID& from, - const SlaveID& slaveId, - const FrameworkID& frameworkId, - const ExecutorID& executorId, - int32_t status); - - void shutdownSlave( - const SlaveID& slaveId, - const std::string& message); - - void authenticate( - const process::UPID& from, - const process::UPID& pid); - - // TODO(bmahler): It would be preferred to use a unique libprocess - // Process identifier (PID is not sufficient) for identifying the - // framework instance, rather than relying on re-registration time. - void frameworkFailoverTimeout( - const FrameworkID& frameworkId, - const process::Time& reregisteredTime); + Slave(const SlaveInfo& _info, + const process::UPID& _pid, + const Option<std::string> _version, + const process::Time& _registeredTime, + const Resources& _checkpointedResources, + const std::vector<ExecutorInfo> executorInfos = + std::vector<ExecutorInfo>(), + const std::vector<Task> tasks = + std::vector<Task>()) + : id(_info.id()), + info(_info), + pid(_pid), + version(_version), + registeredTime(_registeredTime), + connected(true), + active(true), + checkpointedResources(_checkpointedResources), + observer(NULL) + { + CHECK(_info.has_id()); - void offer( - const FrameworkID& framework, - const hashmap<SlaveID, Resources>& resources); + Try<Resources> resources = applyCheckpointedResources( + info.resources(), + _checkpointedResources); - // Invoked when there is a newly elected leading master. - // Made public for testing purposes. - void detected(const process::Future<Option<MasterInfo>>& pid); + // NOTE: This should be validated during slave recovery. + CHECK_SOME(resources); + totalResources = resources.get(); - // Invoked when the contender has lost the candidacy. - // Made public for testing purposes. - void lostCandidacy(const process::Future<Nothing>& lost); + foreach (const ExecutorInfo& executorInfo, executorInfos) { + CHECK(executorInfo.has_framework_id()); + addExecutor(executorInfo.framework_id(), executorInfo); + } - // Continuation of recover(). - // Made public for testing purposes. - process::Future<Nothing> _recover(const Registry& registry); + foreach (const Task& task, tasks) { + addTask(new Task(task)); + } + } - // Continuation of reregisterSlave(). - // Made public for testing purposes. - // TODO(vinod): Instead of doing this create and use a - // MockRegistrar. - // TODO(dhamon): Consider FRIEND_TEST macro from gtest. - void _reregisterSlave( - const SlaveInfo& slaveInfo, - const process::UPID& pid, - const std::vector<Resource>& checkpointedResources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<Archive::Framework>& completedFrameworks, - const std::string& version, - const process::Future<bool>& readmit); + ~Slave() {} - MasterInfo info() const + Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId) { - return info_; + if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) { + return tasks[frameworkId][taskId]; + } + return NULL; } -protected: - virtual void initialize(); - virtual void finalize(); - virtual void exited(const process::UPID& pid); - virtual void visit(const process::MessageEvent& event); - virtual void visit(const process::ExitedEvent& event); - - // Invoked when the message is ready to be executed after - // being throttled. - // 'principal' being None indicates it is throttled by - // 'defaultLimiter'. - void throttled( - const process::MessageEvent& event, - const Option<std::string>& principal); - - // Continuations of visit(). - void _visit(const process::MessageEvent& event); - void _visit(const process::ExitedEvent& event); - - // Helper method invoked when the capacity for a framework - // principal is exceeded. - void exceededCapacity( - const process::MessageEvent& event, - const Option<std::string>& principal, - uint64_t capacity); - - // Recovers state from the registrar. - process::Future<Nothing> recover(); - void recoveredSlavesTimeout(const Registry& registry); + void addTask(Task* task) + { + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); - void _registerSlave( - const SlaveInfo& slaveInfo, - const process::UPID& pid, - const std::vector<Resource>& checkpointedResources, - const std::string& version, - const process::Future<bool>& admit); + CHECK(!tasks[frameworkId].contains(taskId)) + << "Duplicate task " << taskId << " of framework " << frameworkId; - void __reregisterSlave( - Slave* slave, - const std::vector<Task>& tasks); + tasks[frameworkId][taskId] = task; - // 'authenticate' is the future returned by the authenticator. - void _authenticate( - const process::UPID& pid, - const process::Future<Option<std::string>>& authenticate); + if (!protobuf::isTerminalState(task->state())) { + usedResources[frameworkId] += task->resources(); + } - void authenticationTimeout(process::Future<Option<std::string>> future); + LOG(INFO) << "Adding task " << taskId + << " with resources " << task->resources() + << " on slave " << id << " (" << info.hostname() << ")"; + } - void fileAttached(const process::Future<Nothing>& result, - const std::string& path); + // Notification of task termination, for resource accounting. + // TODO(bmahler): This is a hack for performance. We need to + // maintain resource counters because computing task resources + // functionally for all tasks is expensive, for now. + void taskTerminated(Task* task) + { + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); - // Invoked when the contender has entered the contest. - void contended(const process::Future<process::Future<Nothing>>& candidacy); + CHECK(protobuf::isTerminalState(task->state())); + CHECK(tasks[frameworkId].contains(taskId)) + << "Unknown task " << taskId << " of framework " << frameworkId; - // Task reconciliation, split from the message handler - // to allow re-use. - void _reconcileTasks( - Framework* framework, - const std::vector<TaskStatus>& statuses); + usedResources[frameworkId] -= task->resources(); + if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { + usedResources.erase(frameworkId); + } + } - // Handles a known re-registering slave by reconciling the master's - // view of the slave's tasks and executors. - void reconcile( - Slave* slave, - const std::vector<ExecutorInfo>& executors, - const std::vector<Task>& tasks); + void removeTask(Task* task) + { + const TaskID& taskId = task->task_id(); + const FrameworkID& frameworkId = task->framework_id(); - // 'registerFramework()' continuation. - void _registerFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo, - const process::Future<Option<Error>>& validationError); + CHECK(tasks[frameworkId].contains(taskId)) + << "Unknown task " << taskId << " of framework " << frameworkId; - // 'reregisterFramework()' continuation. - void _reregisterFramework( - const process::UPID& from, - const FrameworkInfo& frameworkInfo, - bool failover, - const process::Future<Option<Error>>& validationError); + if (!protobuf::isTerminalState(task->state())) { + usedResources[frameworkId] -= task->resources(); + if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { + usedResources.erase(frameworkId); + } + } - // Add a framework. - void addFramework(Framework* framework); + tasks[frameworkId].erase(taskId); + if (tasks[frameworkId].empty()) { + tasks.erase(frameworkId); + } - // Replace the scheduler for a framework with a new process ID, in - // the event of a scheduler failover. - void failoverFramework(Framework* framework, const process::UPID& newPid); + killedTasks.remove(frameworkId, taskId); + } - // Kill all of a framework's tasks, delete the framework object, and - // reschedule offers that were assigned to this framework. - void removeFramework(Framework* framework); + void addOffer(Offer* offer) + { + CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); - // Remove a framework from the slave, i.e., remove its tasks and - // executors and recover the resources. - void removeFramework(Slave* slave, Framework* framework); + offers.insert(offer); + offeredResources += offer->resources(); + } - void disconnect(Framework* framework); - void deactivate(Framework* framework); + void removeOffer(Offer* offer) + { + CHECK(offers.contains(offer)) << "Unknown offer " << offer->id(); - void disconnect(Slave* slave); - void deactivate(Slave* slave); + offeredResources -= offer->resources(); + offers.erase(offer); + } - // Add a slave. - void addSlave( - Slave* slave, - const std::vector<Archive::Framework>& completedFrameworks = - std::vector<Archive::Framework>()); + bool hasExecutor(const FrameworkID& frameworkId, + const ExecutorID& executorId) const + { + return executors.contains(frameworkId) && + executors.get(frameworkId).get().contains(executorId); + } - // Remove the slave from the registrar. Called when the slave - // does not re-register in time after a master failover. - Nothing removeSlave(const Registry::Slave& slave); + void addExecutor(const FrameworkID& frameworkId, + const ExecutorInfo& executorInfo) + { + CHECK(!hasExecutor(frameworkId, executorInfo.executor_id())) + << "Duplicate executor " << executorInfo.executor_id() + << " of framework " << frameworkId; - // Remove the slave from the registrar and from the master's state. - // - // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved. - void removeSlave( - Slave* slave, - const std::string& message, - Option<process::metrics::Counter> reason = None()); + executors[frameworkId][executorInfo.executor_id()] = executorInfo; + usedResources[frameworkId] += executorInfo.resources(); + } - void _removeSlave( - const SlaveInfo& slaveInfo, - const std::vector<StatusUpdate>& updates, - const process::Future<bool>& removed, - const std::string& message, - Option<process::metrics::Counter> reason = None()); + void removeExecutor(const FrameworkID& frameworkId, + const ExecutorID& executorId) + { + CHECK(hasExecutor(frameworkId, executorId)) + << "Unknown executor " << executorId << " of framework " << frameworkId; - // Authorizes the task. - // Returns true if task is authorized. - // Returns false if task is not authorized. - // Returns failure for transient authorization failures. - process::Future<bool> authorizeTask( - const TaskInfo& task, - Framework* framework); + usedResources[frameworkId] -= + executors[frameworkId][executorId].resources(); - // Add the task and its executor (if not already running) to the - // framework and slave. Returns the resources consumed as a result, - // which includes resources for the task and its executor - // (if not already running). - Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); + // XXX Remove. - // Transitions the task, and recovers resources if the task becomes - // terminal. - void updateTask(Task* task, const StatusUpdate& update); + executors[frameworkId].erase(executorId); + if (executors[frameworkId].empty()) { + executors.erase(frameworkId); + } + } - // Removes the task. - void removeTask(Task* task); + void apply(const Offer::Operation& operation) + { + Try<Resources> resources = totalResources.apply(operation); + CHECK_SOME(resources); - // Remove an executor and recover its resources. - void removeExecutor( - Slave* slave, - const FrameworkID& frameworkId, - const ExecutorID& executorId); + totalResources = resources.get(); + checkpointedResources = totalResources.filter(needCheckpointing); + } - // Updates slave's resources by applying the given operation. It - // also updates the allocator and sends a CheckpointResourcesMessage - // to the slave with slave's current checkpointed resources. - void applyOfferOperation( - Framework* framework, - Slave* slave, - const Offer::Operation& operation); + const SlaveID id; + const SlaveInfo info; - // Forwards the update to the framework. - void forward( - const StatusUpdate& update, - const process::UPID& acknowledgee, - Framework* framework); + process::UPID pid; - // Remove an offer after specified timeout - void offerTimeout(const OfferID& offerId); + // The Mesos version of the slave. If set, the slave is >= 0.21.0. + // TODO(bmahler): Use stout's Version when it can parse labels, etc. + // TODO(bmahler): Make this required once it is always set. + const Option<std::string> version; - // Remove an offer and optionally rescind the offer as well. - void removeOffer(Offer* offer, bool rescind = false); + process::Time registeredTime; + Option<process::Time> reregisteredTime; - Framework* getFramework(const FrameworkID& frameworkId); - Slave* getSlave(const SlaveID& slaveId); - Offer* getOffer(const OfferID& offerId); + // Slave becomes disconnected when the socket closes. + bool connected; - FrameworkID newFrameworkId(); - OfferID newOfferId(); - SlaveID newSlaveId(); + // Slave becomes deactivated when it gets disconnected. In the + // future this might also happen via HTTP endpoint. + // No offers will be made for a deactivated slave. + bool active; - Option<Credentials> credentials; + // Executors running on this slave. + hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors; -private: - void drop( - const process::UPID& from, - const scheduler::Call& call, - const std::string& message); + // Tasks present on this slave. + // TODO(bmahler): The task pointer ownership complexity arises from the fact + // that we own the pointer here, but it's shared with the Framework struct. + // We should find a way to eliminate this. + hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks; - void drop( - Framework* framework, - const Offer::Operation& operation, - const std::string& message); + // Tasks that were asked to kill by frameworks. + // This is used for reconciliation when the slave re-registers. + multihashmap<FrameworkID, TaskID> killedTasks; - // Call handlers. - void receive( - const process::UPID& from, - const scheduler::Call& call); + // Active offers on this slave. + hashset<Offer*> offers; - void accept( - Framework* framework, - const scheduler::Call::Accept& accept); + hashmap<FrameworkID, Resources> usedResources; // Active task / executors. + Resources offeredResources; // Offers. - void _accept( - const FrameworkID& frameworkId, - const SlaveID& slaveId, - const Resources& offeredResources, - const scheduler::Call::Accept& accept, - const process::Future<std::list<process::Future<bool>>>& authorizations); + // Resources that should be checkpointed by the slave (e.g., + // persistent volumes, dynamic reservations, etc). These are either + // in use by a task/executor, or are available for use and will be + // re-offered to the framework. + Resources checkpointedResources; - void reconcile( - Framework* framework, - const scheduler::Call::Reconcile& reconcile); + // The current total resources of the slave. Note that this is + // different from 'info.resources()' because this also consider + // operations (e.g., CREATE, RESERVE) that have been applied. + Resources totalResources; - void kill( - Framework* framework, - const scheduler::Call::Kill& kill); + SlaveObserver* observer; - void shutdown( - Framework* framework, - const scheduler::Call::Shutdown& shutdown); +private: + Slave(const Slave&); // No copying. + Slave& operator = (const Slave&); // No assigning. +}; - bool elected() const - { - return leader.isSome() && leader.get() == info_; - } - // Inner class used to namespace HTTP route handlers (see - // master/http.cpp for implementations). - class Http - { - public: - explicit Http(Master* _master) : master(_master) {} +inline std::ostream& operator << (std::ostream& stream, const Slave& slave) +{ + return stream << slave.id << " at " << slave.pid + << " (" << slave.info.hostname() << ")"; +} - // Logs the request, route handlers can compose this with the - // desired request handler to get consistent request logging. - static void log(const process::http::Request& request); - // /master/health - process::Future<process::http::Response> health( - const process::http::Request& request) const; +// Information about a connected or completed framework. +// TODO(bmahler): Keeping the task and executor information in sync +// across the Slave and Framework structs is error prone! +struct Framework +{ + Framework(const FrameworkInfo& _info, + const process::UPID& _pid, + const process::Time& time = process::Clock::now()) + : info(_info), + pid(_pid), + connected(true), + active(true), + registeredTime(time), + reregisteredTime(time), + completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} - // /master/observe - process::Future<process::http::Response> observe( - const process::http::Request& request) const; + ~Framework() {} - // /master/redirect - process::Future<process::http::Response> redirect( - const process::http::Request& request) const; + Task* getTask(const TaskID& taskId) + { + if (tasks.count(taskId) > 0) { + return tasks[taskId]; + } else { + return NULL; + } + } - // /master/roles.json - process::Future<process::http::Response> roles( - const process::http::Request& request) const; + void addTask(Task* task) + { + CHECK(!tasks.contains(task->task_id())) + << "Duplicate task " << task->task_id() + << " of framework " << task->framework_id(); - // /master/teardown and /master/shutdown (deprecated). - process::Future<process::http::Response> teardown( - const process::http::Request& request) const; + tasks[task->task_id()] = task; - // /master/slaves - process::Future<process::http::Response> slaves( - const process::http::Request& request) const; + if (!protobuf::isTerminalState(task->state())) { + totalUsedResources += task->resources(); + usedResources[task->slave_id()] += task->resources(); + } + } - // /master/state.json - process::Future<process::http::Response> state( - const process::http::Request& request) const; + // Notification of task termination, for resource accounting. + // TODO(bmahler): This is a hack for performance. We need to + // maintain resource counters because computing task resources + // functionally for all tasks is expensive, for now. + void taskTerminated(Task* task) + { + CHECK(protobuf::isTerminalState(task->state())); + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); - // /master/state-summary - process::Future<process::http::Response> stateSummary( - const process::http::Request& request) const; + totalUsedResources -= task->resources(); + usedResources[task->slave_id()] -= task->resources(); + if (usedResources[task->slave_id()].empty()) { + usedResources.erase(task->slave_id()); + } + } - // /master/tasks.json - process::Future<process::http::Response> tasks( - const process::http::Request& request) const; + void addCompletedTask(const Task& task) + { + // TODO(adam-mesos): Check if completed task already exists. + completedTasks.push_back(std::shared_ptr<Task>(new Task(task))); + } - const static std::string HEALTH_HELP; - const static std::string OBSERVE_HELP; - const static std::string REDIRECT_HELP; - const static std::string SHUTDOWN_HELP; // Deprecated. - const static std::string TEARDOWN_HELP; - const static std::string SLAVES_HELP; - const static std::string TASKS_HELP; + void removeTask(Task* task) + { + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); - private: - // Helper for doing authentication, returns the credential used if - // the authentication was successful (or none if no credentials - // have been given to the master), otherwise an Error. - Result<Credential> authenticate( - const process::http::Request& request) const; + if (!protobuf::isTerminalState(task->state())) { + totalUsedResources -= task->resources(); + usedResources[task->slave_id()] -= task->resources(); + if (usedResources[task->slave_id()].empty()) { + usedResources.erase(task->slave_id()); + } + } - // Continuations. - process::Future<process::http::Response> _teardown( - const FrameworkID& id, - bool authorized = true) const; + addCompletedTask(*task); - Master* master; - }; + tasks.erase(task->task_id()); + } - Master(const Master&); // No copying. - Master& operator = (const Master&); // No assigning. + void addOffer(Offer* offer) + { + CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); + offers.insert(offer); + totalOfferedResources += offer->resources(); + offeredResources[offer->slave_id()] += offer->resources(); + } - friend struct Metrics; + void removeOffer(Offer* offer) + { + CHECK(offers.find(offer) != offers.end()) + << "Unknown offer " << offer->id(); - // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to - // make the following functions friends so that validation functions - // can get Offer* and Slave*. - friend Offer* validation::offer::getOffer( - Master* master, const OfferID& offerId); + totalOfferedResources -= offer->resources(); + offeredResources[offer->slave_id()] -= offer->resources(); + if (offeredResources[offer->slave_id()].empty()) { + offeredResources.erase(offer->slave_id()); + } - friend Slave* validation::offer::getSlave( - Master* master, const SlaveID& slaveId); + offers.erase(offer); + } - const Flags flags; + bool hasExecutor(const SlaveID& slaveId, + const ExecutorID& executorId) + { + return executors.contains(slaveId) && + executors[slaveId].contains(executorId); + } - Option<MasterInfo> leader; // Current leading master. + void addExecutor(const SlaveID& slaveId, + const ExecutorInfo& executorInfo) + { + CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) + << "Duplicate executor " << executorInfo.executor_id() + << " on slave " << slaveId; - mesos::master::allocator::Allocator* allocator; - WhitelistWatcher* whitelistWatcher; - Registrar* registrar; - Repairer* repairer; - Files* files; + executors[slaveId][executorInfo.executor_id()] = executorInfo; + totalUsedResources += executorInfo.resources(); + usedResources[slaveId] += executorInfo.resources(); + } - MasterContender* contender; - MasterDetector* detector; + void removeExecutor(const SlaveID& slaveId, + const ExecutorID& executorId) + { + CHECK(hasExecutor(slaveId, executorId)) + << "Unknown executor " << executorId + << " of framework " << id() + << " of slave " << slaveId; - const Option<Authorizer*> authorizer; + totalUsedResources -= executors[slaveId][executorId].resources(); + usedResources[slaveId] -= executors[slaveId][executorId].resources(); + if (usedResources[slaveId].empty()) { + usedResources.erase(slaveId); + } - MasterInfo info_; + executors[slaveId].erase(executorId); + if (executors[slaveId].empty()) { + executors.erase(slaveId); + } + } - // Indicates when recovery is complete. Recovery begins once the - // master is elected as a leader. - Option<process::Future<Nothing>> recovered; + const FrameworkID id() const { return info.id(); } - struct Slaves + // Update fields in 'info' using those in 'source'. Currently this + // only updates 'name', 'failover_timeout', 'hostname', and + // 'webui_url'. + void updateFrameworkInfo(const FrameworkInfo& source) { - Slaves() : removed(MAX_REMOVED_SLAVES) {} + // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because + // of MESOS-2559. Once this is fixed we can 'CHECK' that we only + // merge 'info' from the same framework 'id'. - // Imposes a time limit for slaves that we recover from the - // registry to re-register with the master. - Option<process::Timer> recoveredTimer; + // TODO(jmlvanre): Merge other fields as per design doc in + // MESOS-703. - // Slaves that have been recovered from the registrar but have yet - // to re-register. We keep a "reregistrationTimer" above to ensure - // we remove these slaves if they do not re-register. - hashset<SlaveID> recovered; + if (source.user() != info.user()) { + LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user() + << "' for framework " << id() << ". Check MESOS-703"; + } - // Slaves that are in the process of registering. - hashset<process::UPID> registering; + info.set_name(source.name()); - // Only those slaves that are re-registering for the first time - // with this master. We must not answer questions related to - // these slaves until the registrar determines their fate. - hashset<SlaveID> reregistering; + if (source.has_failover_timeout()) { + info.set_failover_timeout(source.failover_timeout()); + } else { + info.clear_failover_timeout(); + } - hashmap<SlaveID, Slave*> registered; + if (source.checkpoint() != info.checkpoint()) { + LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '" + << stringify(info.checkpoint()) << "' for framework " << id() + << ". Check MESOS-703"; + } - // Slaves that are in the process of being removed from the - // registrar. Think of these as being partially removed: we must - // not answer questions related to these until they are removed - // from the registry. - hashset<SlaveID> removing; + if (source.role() != info.role()) { + LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role() + << "' for framework " << id() << ". Check MESOS-703"; + } - // We track removed slaves to preserve the consistency - // semantics of the pre-registrar code when a non-strict registrar - // is being used. That is, if we remove a slave, we must make - // an effort to prevent it from (re-)registering, sending updates, - // etc. We keep a cache here to prevent this from growing in an - // unbounded manner. - // TODO(bmahler): Ideally we could use a cache with set semantics. - Cache<SlaveID, Nothing> removed; + if (source.has_hostname()) { + info.set_hostname(source.hostname()); + } else { + info.clear_hostname(); + } - // This rate limiter is used to limit the removal of slaves failing - // health checks. - // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is - // a wrapper around libprocess process which is thread safe. - Option<std::shared_ptr<process::RateLimiter>> limiter; + if (source.principal() != info.principal()) { + LOG(WARNING) << "Can not update FrameworkInfo.principal to '" + << info.principal() << "' for framework " << id() + << ". Check MESOS-703"; + } - bool transitioning(const Option<SlaveID>& slaveId) - { - if (slaveId.isSome()) { - return recovered.contains(slaveId.get()) || - reregistering.contains(slaveId.get()) || - removing.contains(slaveId.get()); - } else { - return !recovered.empty() || - !reregistering.empty() || - !removing.empty(); - } + if (source.has_webui_url()) { + info.set_webui_url(source.webui_url()); + } else { + info.clear_webui_url(); } - } slaves; + } - struct Frameworks - { - Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {} + FrameworkInfo info; - hashmap<FrameworkID, Framework*> registered; - boost::circular_buffer<std::shared_ptr<Framework>> completed; + process::UPID pid; - // Principals of frameworks keyed by PID. - // NOTE: Multiple PIDs can map to the same principal. The - // principal is None when the framework doesn't specify it. - // The differences between this map and 'authenticated' are: - // 1) This map only includes *registered* frameworks. The mapping - // is added when a framework (re-)registers. - // 2) This map includes unauthenticated frameworks (when Master - // allows them) if they have principals specified in - // FrameworkInfo. - hashmap<process::UPID, Option<std::string>> principals; + // Framework becomes disconnected when the socket closes. + bool connected; - // BoundedRateLimiters keyed by the framework principal. - // Like Metrics::Frameworks, all frameworks of the same principal - // are throttled together at a common rate limit. - hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; + // Framework becomes deactivated when it is disconnected or + // the master receives a DeactivateFrameworkMessage. + // No offers will be made to a deactivated framework. + bool active; - // The default limiter is for frameworks not specified in - // 'flags.rate_limits'. - Option<process::Owned<BoundedRateLimiter>> defaultLimiter; - } frameworks; + process::Time registeredTime; + process::Time reregisteredTime; + process::Time unregisteredTime; - hashmap<OfferID, Offer*> offers; - hashmap<OfferID, process::Timer> offerTimers; + // Tasks that have not yet been launched because they are currently + // being authorized. + hashmap<TaskID, TaskInfo> pendingTasks; - hashmap<std::string, Role*> roles; + hashmap<TaskID, Task*> tasks; - // Authenticator names as supplied via flags. - std::vector<std::string> authenticatorNames; + // NOTE: We use a shared pointer for Task because clang doesn't like + // Boost's implementation of circular_buffer with Task (Boost + // attempts to do some memset's which are unsafe). + boost::circular_buffer<std::shared_ptr<Task>> completedTasks; - Option<Authenticator*> authenticator; + hashset<Offer*> offers; // Active offers for framework. - // Frameworks/slaves that are currently in the process of authentication. - // 'authenticating' future is completed when authenticator - // completes authentication. - // The future is removed from the map when master completes authentication. - hashmap<process::UPID, process::Future<Option<std::string>>> authenticating; + hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors; - // Principals of authenticated frameworks/slaves keyed by PID. - hashmap<process::UPID, std::string> authenticated; + // NOTE: For the used and offered resources below, we keep the + // total as well as partitioned by SlaveID. + // We expose the total resources via the HTTP endpoint, and we + // keep a running total of the resources because looping over the + // slaves to sum the resources has led to perf issues (MESOS-1862). + // We keep the resources partitioned by SlaveID because non-scalar + // resources can be lost when summing them up across multiple + // slaves (MESOS-2373). + // + // Also note that keeping the totals is safe even though it yields + // incorrect results for non-scalar resources. + // (1) For overlapping set items / ranges across slaves, these + // will get added N times but only represented once. + // (2) When an initial subtraction occurs (N-1), the resource is + // no longer represented. (This is the source of the bug). + // (3) When any further subtractions occur (N-(1+M)), the + // Resources simply ignores the subtraction since there's + // nothing to remove, so this is safe for now. - int64_t nextFrameworkId; // Used to give each framework a unique ID. - int64_t nextOfferId; // Used to give each slot offer a unique ID. - int64_t nextSlaveId; // Used to give each slave a unique ID. + // TODO(mpark): Strip the non-scalar resources out of the totals + // in order to avoid reporting incorrect statistics (MESOS-2623). - // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is - // thread safe. - // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains - // copyable metric types only. - std::shared_ptr<Metrics> metrics; + // Active task / executor resources. + Resources totalUsedResources; + hashmap<SlaveID, Resources> usedResources; - // Gauge handlers. - double _uptime_secs() - { - return (process::Clock::now() - startTime).secs(); - } + // Offered resources. + Resources totalOfferedResources; + hashmap<SlaveID, Resources> offeredResources; - double _elected() - { - return elected() ? 1 : 0; - } +private: + Framework(const Framework&); // No copying. + Framework& operator = (const Framework&); // No assigning. +}; + + +inline std::ostream& operator << ( + std::ostream& stream, + const Framework& framework) +{ + // TODO(vinod): Also log the hostname once FrameworkInfo is properly + // updated on framework failover (MESOS-1784). + return stream << framework.id() << " (" << framework.info.name() + << ") at " << framework.pid; +} - double _slaves_connected(); - double _slaves_disconnected(); - double _slaves_active(); - double _slaves_inactive(); - double _frameworks_connected(); - double _frameworks_disconnected(); - double _frameworks_active(); - double _frameworks_inactive(); +// Information about an active role. +struct Role +{ + explicit Role(const mesos::master::RoleInfo& _info) + : info(_info) {} - double _outstanding_offers() + void addFramework(Framework* framework) { - return offers.size(); + frameworks[framework->id()] = framework; } - double _event_queue_messages() + void removeFramework(Framework* framework) { - return static_cast<double>(eventCount<process::MessageEvent>()); + frameworks.erase(framework->id()); } - double _event_queue_dispatches() + Resources resources() const { - return static_cast<double>(eventCount<process::DispatchEvent>()); - } + Resources resources; + foreachvalue (Framework* framework, frameworks) { + resources += framework->totalUsedResources; + resources += framework->totalOfferedResources; + } - double _event_queue_http_requests() - { - return static_cast<double>(eventCount<process::HttpEvent>()); + return resources; } - double _tasks_staging(); - double _tasks_starting(); - double _tasks_running(); + mesos::master::RoleInfo info; - double _resources_total(const std::string& name); - double _resources_used(const std::string& name); - double _resources_percent(const std::string& name); + hashmap<FrameworkID, Framework*> frameworks; +}; - process::Time startTime; // Start time used to calculate uptime. - Option<process::Time> electedTime; // Time when this master is elected. +class Master : public ProtobufProcess<Master> +{ +public: + Master(mesos::master::allocator::Allocator* allocator, + Registrar* registrar, + Repairer* repairer, + Files* files, + MasterContender* contender, + MasterDetector* detector, + const Option<Authorizer*>& authorizer, + const Option<std::shared_ptr<process::RateLimiter>>& + slaveRemovalLimiter, + const Flags& flags = Flags()); - // Validates the framework including authorization. - // Returns None if the framework is valid. - // Returns Error if the framework is invalid. - // Returns Failure if authorization returns 'Failure'. - process::Future<Option<Error>> validate( + virtual ~Master(); + + // Message handlers. + void submitScheduler( + const std::string& name); + + void registerFramework( + const process::UPID& from, + const FrameworkInfo& frameworkInfo); + + void reregisterFramework( + const process::UPID& from, const FrameworkInfo& frameworkInfo, - const process::UPID& from); -}; + bool failover); + void unregisterFramework( + const process::UPID& from, + const FrameworkID& frameworkId); -struct Slave -{ - Slave(const SlaveInfo& _info, - const process::UPID& _pid, - const Option<std::string> _version, - const process::Time& _registeredTime, - const Resources& _checkpointedResources, - const std::vector<ExecutorInfo> executorInfos = - std::vector<ExecutorInfo>(), - const std::vector<Task> tasks = - std::vector<Task>()) - : id(_info.id()), - info(_info), - pid(_pid), - version(_version), - registeredTime(_registeredTime), - connected(true), - active(true), - checkpointedResources(_checkpointedResources), - observer(NULL) - { - CHECK(_info.has_id()); + void deactivateFramework( + const process::UPID& from, + const FrameworkID& frameworkId); - Try<Resources> resources = applyCheckpointedResources( - info.resources(), - _checkpointedResources); + // TODO(vinod): Remove this once the old driver is removed. + void resourceRequest( + const process::UPID& from, + const FrameworkID& frameworkId, + const std::vector<Request>& requests); - // NOTE: This should be validated during slave recovery. - CHECK_SOME(resources); - totalResources = resources.get(); + void launchTasks( + const process::UPID& from, + const FrameworkID& frameworkId, + const std::vector<TaskInfo>& tasks, + const Filters& filters, + const std::vector<OfferID>& offerIds); - foreach (const ExecutorInfo& executorInfo, executorInfos) { - CHECK(executorInfo.has_framework_id()); - addExecutor(executorInfo.framework_id(), executorInfo); - } + void reviveOffers( + const process::UPID& from, + const FrameworkID& frameworkId); - foreach (const Task& task, tasks) { - addTask(new Task(task)); - } - } + void killTask( + const process::UPID& from, + const FrameworkID& frameworkId, + const TaskID& taskId); - ~Slave() {} + void statusUpdateAcknowledgement( + const process::UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const TaskID& taskId, + const std::string& uuid); - Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId) - { - if (tasks.contains(frameworkId) && tasks[frameworkId].contains(taskId)) { - return tasks[frameworkId][taskId]; - } - return NULL; - } + void schedulerMessage( + const process::UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const std::string& data); - void addTask(Task* task) - { - const TaskID& taskId = task->task_id(); - const FrameworkID& frameworkId = task->framework_id(); + void registerSlave( + const process::UPID& from, + const SlaveInfo& slaveInfo, + const std::vector<Resource>& checkpointedResources, + const std::string& version); - CHECK(!tasks[frameworkId].contains(taskId)) - << "Duplicate task " << taskId << " of framework " << frameworkId; + void reregisterSlave( + const process::UPID& from, + const SlaveInfo& slaveInfo, + const std::vector<Resource>& checkpointedResources, + const std::vector<ExecutorInfo>& executorInfos, + const std::vector<Task>& tasks, + const std::vector<Archive::Framework>& completedFrameworks, + const std::string& version); - tasks[frameworkId][taskId] = task; + void unregisterSlave( + const process::UPID& from, + const SlaveID& slaveId); - if (!protobuf::isTerminalState(task->state())) { - usedResources[frameworkId] += task->resources(); - } + void statusUpdate( + const StatusUpdate& update, + const process::UPID& pid); + + void reconcileTasks( + const process::UPID& from, + const FrameworkID& frameworkId, + const std::vector<TaskStatus>& statuses); + + void exitedExecutor( + const process::UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + int32_t status); + + void shutdownSlave( + const SlaveID& slaveId, + const std::string& message); + + void authenticate( + const process::UPID& from, + const process::UPID& pid); + + // TODO(bmahler): It would be preferred to use a unique libprocess + // Process identifier (PID is not sufficient) for identifying the + // framework instance, rather than relying on re-registration time. + void frameworkFailoverTimeout( + const FrameworkID& frameworkId, + const process::Time& reregisteredTime); + + void offer( + const FrameworkID& framework, + const hashmap<SlaveID, Resources>& resources); + + // Invoked when there is a newly elected leading master. + // Made public for testing purposes. + void detected(const process::Future<Option<MasterInfo>>& pid); + + // Invoked when the contender has lost the candidacy. + // Made public for testing purposes. + void lostCandidacy(const process::Future<Nothing>& lost); + + // Continuation of recover(). + // Made public for testing purposes. + process::Future<Nothing> _recover(const Registry& registry); + + // Continuation of reregisterSlave(). + // Made public for testing purposes. + // TODO(vinod): Instead of doing this create and use a + // MockRegistrar. + // TODO(dhamon): Consider FRIEND_TEST macro from gtest. + void _reregisterSlave( + const SlaveInfo& slaveInfo, + const process::UPID& pid, + const std::vector<Resource>& checkpointedResources, + const std::vector<ExecutorInfo>& executorInfos, + const std::vector<Task>& tasks, + const std::vector<Archive::Framework>& completedFrameworks, + const std::string& version, + const process::Future<bool>& readmit); + + MasterInfo info() const + { + return info_; + } + +protected: + virtual void initialize(); + virtual void finalize(); + virtual void exited(const process::UPID& pid); + virtual void visit(const process::MessageEvent& event); + virtual void visit(const process::ExitedEvent& event); - LOG(INFO) << "Adding task " << taskId - << " with resources " << task->resources() - << " on slave " << id << " (" << info.hostname() << ")"; - } + // Invoked when the message is ready to be executed after + // being throttled. + // 'principal' being None indicates it is throttled by + // 'defaultLimiter'. + void throttled( + const process::MessageEvent& event, + const Option<std::string>& principal); - // Notification of task termination, for resource accounting. - // TODO(bmahler): This is a hack for performance. We need to - // maintain resource counters because computing task resources - // functionally for all tasks is expensive, for now. - void taskTerminated(Task* task) - { - const TaskID& taskId = task->task_id(); - const FrameworkID& frameworkId = task->framework_id(); + // Continuations of visit(). + void _visit(const process::MessageEvent& event); + void _visit(const process::ExitedEvent& event); - CHECK(protobuf::isTerminalState(task->state())); - CHECK(tasks[frameworkId].contains(taskId)) - << "Unknown task " << taskId << " of framework " << frameworkId; + // Helper method invoked when the capacity for a framework + // principal is exceeded. + void exceededCapacity( + const process::MessageEvent& event, + const Option<std::string>& principal, + uint64_t capacity); - usedResources[frameworkId] -= task->resources(); - if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { - usedResources.erase(frameworkId); - } - } + // Recovers state from the registrar. + process::Future<Nothing> recover(); + void recoveredSlavesTimeout(const Registry& registry); - void removeTask(Task* task) - { - const TaskID& taskId = task->task_id(); - const FrameworkID& frameworkId = task->framework_id(); + void _registerSlave( + const SlaveInfo& slaveInfo, + const process::UPID& pid, + const std::vector<Resource>& checkpointedResources, + const std::string& version, + const process::Future<bool>& admit); - CHECK(tasks[frameworkId].contains(taskId)) - << "Unknown task " << taskId << " of framework " << frameworkId; + void __reregisterSlave( + Slave* slave, + const std::vector<Task>& tasks); - if (!protobuf::isTerminalState(task->state())) { - usedResources[frameworkId] -= task->resources(); - if (!tasks.contains(frameworkId) && !executors.contains(frameworkId)) { - usedResources.erase(frameworkId); - } - } + // 'authenticate' is the future returned by the authenticator. + void _authenticate( + const process::UPID& pid, + const process::Future<Option<std::string>>& authenticate); - tasks[frameworkId].erase(taskId); - if (tasks[frameworkId].empty()) { - tasks.erase(frameworkId); - } + void authenticationTimeout(process::Future<Option<std::string>> future); - killedTasks.remove(frameworkId, taskId); - } + void fileAttached(const process::Future<Nothing>& result, + const std::string& path); - void addOffer(Offer* offer) - { - CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); + // Invoked when the contender has entered the contest. + void contended(const process::Future<process::Future<Nothing>>& candidacy); - offers.insert(offer); - offeredResources += offer->resources(); - } + // Task reconciliation, split from the message handler + // to allow re-use. + void _reconcileTasks( + Framework* framework, + const std::vector<TaskStatus>& statuses); - void removeOffer(Offer* offer) - { - CHECK(offers.contains(offer)) << "Unknown offer " << offer->id(); + // Handles a known re-registering slave by reconciling the master's + // view of the slave's tasks and executors. + void reconcile( + Slave* slave, + const std::vector<ExecutorInfo>& executors, + const std::vector<Task>& tasks); - offeredResources -= offer->resources(); - offers.erase(offer); - } + // 'registerFramework()' continuation. + void _registerFramework( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + const process::Future<Option<Error>>& validationError); - bool hasExecutor(const FrameworkID& frameworkId, - const ExecutorID& executorId) const - { - return executors.contains(frameworkId) && - executors.get(frameworkId).get().contains(executorId); - } + // 'reregisterFramework()' continuation. + void _reregisterFramework( + const process::UPID& from, + const FrameworkInfo& frameworkInfo, + bool failover, + const process::Future<Option<Error>>& validationError); - void addExecutor(const FrameworkID& frameworkId, - const ExecutorInfo& executorInfo) - { - CHECK(!hasExecutor(frameworkId, executorInfo.executor_id())) - << "Duplicate executor " << executorInfo.executor_id() - << " of framework " << frameworkId; + // Add a framework. + void addFramework(Framework* framework); - executors[frameworkId][executorInfo.executor_id()] = executorInfo; - usedResources[frameworkId] += executorInfo.resources(); - } + // Replace the scheduler for a framework with a new process ID, in + // the event of a scheduler failover. + void failoverFramework(Framework* framework, const process::UPID& newPid); - void removeExecutor(const FrameworkID& frameworkId, - const ExecutorID& executorId) - { - CHECK(hasExecutor(frameworkId, executorId)) - << "Unknown executor " << executorId << " of framework " << frameworkId; + // Kill all of a framework's tasks, delete the framework object, and + // reschedule offers that were assigned to this framework. + void removeFramework(Framework* framework); - usedResources[frameworkId] -= - executors[frameworkId][executorId].resources(); + // Remove a framework from the slave, i.e., remove its tasks and + // executors and recover the resources. + void removeFramework(Slave* slave, Framework* framework); - // XXX Remove. + void disconnect(Framework* framework); + void deactivate(Framework* framework); - executors[frameworkId].erase(executorId); - if (executors[frameworkId].empty()) { - executors.erase(frameworkId); - } - } + void disconnect(Slave* slave); + void deactivate(Slave* slave); - void apply(const Offer::Operation& operation) - { - Try<Resources> resources = totalResources.apply(operation); - CHECK_SOME(resources); + // Add a slave. + void addSlave( + Slave* slave, + const std::vector<Archive::Framework>& completedFrameworks = + std::vector<Archive::Framework>()); - totalResources = resources.get(); - checkpointedResources = totalResources.filter(needCheckpointing); - } + // Remove the slave from the registrar. Called when the slave + // does not re-register in time after a master failover. + Nothing removeSlave(const Registry::Slave& slave); - const SlaveID id; - const SlaveInfo info; + // Remove the slave from the registrar and from the master's state. + // + // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved. + void removeSlave( + Slave* slave, + const std::string& message, + Option<process::metrics::Counter> reason = None()); - process::UPID pid; + void _removeSlave( + const SlaveInfo& slaveInfo, + const std::vector<StatusUpdate>& updates, + const process::Future<bool>& removed, + const std::string& message, + Option<process::metrics::Counter> reason = None()); - // The Mesos version of the slave. If set, the slave is >= 0.21.0. - // TODO(bmahler): Use stout's Version when it can parse labels, etc. - // TODO(bmahler): Make this required once it is always set. - const Option<std::string> version; + // Authorizes the task. + // Returns true if task is authorized. + // Returns false if task is not authorized. + // Returns failure for transient authorization failures. + process::Future<bool> authorizeTask( + const TaskInfo& task, + Framework* framework); - process::Time registeredTime; - Option<process::Time> reregisteredTime; + // Add the task and its executor (if not already running) to the + // framework and slave. Returns the resources consumed as a result, + // which includes resources for the task and its executor + // (if not already running). + Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave); - // Slave becomes disconnected when the socket closes. - bool connected; + // Transitions the task, and recovers resources if the task becomes + // terminal. + void updateTask(Task* task, const StatusUpdate& update); - // Slave becomes deactivated when it gets disconnected. In the - // future this might also happen via HTTP endpoint. - // No offers will be made for a deactivated slave. - bool active; + // Removes the task. + void removeTask(Task* task); - // Executors running on this slave. - hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo>> executors; + // Remove an executor and recover its resources. + void removeExecutor( + Slave* slave, + const FrameworkID& frameworkId, + const ExecutorID& executorId); - // Tasks present on this slave. - // TODO(bmahler): The task pointer ownership complexity arises from the fact - // that we own the pointer here, but it's shared with the Framework struct. - // We should find a way to eliminate this. - hashmap<FrameworkID, hashmap<TaskID, Task*>> tasks; + // Updates slave's resources by applying the given operation. It + // also updates the allocator and sends a CheckpointResourcesMessage + // to the slave with slave's current checkpointed resources. + void applyOfferOperation( + Framework* framework, + Slave* slave, + const Offer::Operation& operation); - // Tasks that were asked to kill by frameworks. - // This is used for reconciliation when the slave re-registers. - multihashmap<FrameworkID, TaskID> killedTasks; + // Forwards the update to the framework. + void forward( + const StatusUpdate& update, + const process::UPID& acknowledgee, + Framework* framework); - // Active offers on this slave. - hashset<Offer*> offers; + // Remove an offer after specified timeout + void offerTimeout(const OfferID& offerId); - hashmap<FrameworkID, Resources> usedResources; // Active task / executors. - Resources offeredResources; // Offers. + // Remove an offer and optionally rescind the offer as well. + void removeOffer(Offer* offer, bool rescind = false); - // Resources that should be checkpointed by the slave (e.g., - // persistent volumes, dynamic reservations, etc). These are either - // in use by a task/executor, or are available for use and will be - // re-offered to the framework. - Resources checkpointedResources; + Framework* getFramework(const FrameworkID& frameworkId); + Slave* getSlave(const SlaveID& slaveId); + Offer* getOffer(const OfferID& offerId); - // The current total resources of the slave. Note that this is - // different from 'info.resources()' because this also consider - // operations (e.g., CREATE, RESERVE) that have been applied. - Resources totalResources; + FrameworkID newFrameworkId(); + OfferID newOfferId(); + SlaveID newSlaveId(); - SlaveObserver* observer; + Option<Credentials> credentials; private: - Slave(const Slave&); // No copying. - Slave& operator = (const Slave&); // No assigning. -}; + void drop( + const process::UPID& from, + const scheduler::Call& call, + const std::string& message); + void drop( + Framework* framework, + const Offer::Operation& operation, + const std::string& message); -inline std::ostream& operator << (std::ostream& stream, const Slave& slave) -{ - return stream << slave.id << " at " << slave.pid - << " (" << slave.info.hostname() << ")"; -} + // Call handlers. + void receive( + const process::UPID& from, + const scheduler::Call& call); + + void accept( + Framework* framework, + const scheduler::Call::Accept& accept); + void _accept( + const FrameworkID& frameworkId, + const SlaveID& slaveId, + const Resources& offeredResources, + const scheduler::Call::Accept& accept, + const process::Future<std::list<process::Future<bool>>>& authorizations); -// Information about a connected or completed framework. -// TODO(bmahler): Keeping the task and executor information in sync -// across the Slave and Framework structs is error prone! -struct Framework -{ - Framework(const FrameworkInfo& _info, - const process::UPID& _pid, - const process::Time& time = process::Clock::now()) - : info(_info), - pid(_pid), - connected(true), - active(true), - registeredTime(time), - reregisteredTime(time), - completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {} + void reconcile( + Framework* framework, + const scheduler::Call::Reconcile& reconcile); - ~Framework() {} + void kill( + Framework* framework, + const scheduler::Call::Kill& kill); - Task* getTask(const TaskID& taskId) + void shutdown( + Framework* framework, + const scheduler::Call::Shutdown& shutdown); + + bool elected() const { - if (tasks.count(taskId) > 0) { - return tasks[taskId]; - } else { - return NULL; - } + return leader.isSome() && leader.get() == info_; } - void addTask(Task* task) + // Inner class used to namespace HTTP route handlers (see + // master/http.cpp for implementations). + class Http { - CHECK(!tasks.contains(task->task_id())) - << "Duplicate task " << task->task_id() - << " of framework " << task->framework_id(); + public: + explicit Http(Master* _master) : master(_master) {} - tasks[task->task_id()] = task; + // Logs the request, route handlers can compose this with the + // desired request handler to get consistent request logging. + static void log(const process::http::Request& request); - if (!protobuf::isTerminalState(task->state())) { - totalUsedResources += task->resources(); - usedResources[task->slave_id()] += task->resources(); - } - } + // /master/health + process::Future<process::http::Response> health( + const process::http::Request& request) const; - // Notification of task termination, for resource accounting. - // TODO(bmahler): This is a hack for performance. We need to - // maintain resource counters because computing task resources - // functionally for all tasks is expensive, for now. - void taskTerminated(Task* task) - { - CHECK(protobuf::isTerminalState(task->state())); - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); + // /master/observe + process::Future<process::http::Response> observe( + const process::http::Request& request) const; - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } - } + // /master/redirect + process::Future<process::http::Response> redirect( + const process::http::Request& request) const; - void addCompletedTask(const Task& task) - { - // TODO(adam-mesos): Check if completed task already exists. - completedTasks.push_back(std::shared_ptr<Task>(new Task(task))); - } + // /master/roles.json + process::Future<process::http::Response> roles( + const process::http::Request& request) const; - void removeTask(Task* task) - { - CHECK(tasks.contains(task->task_id())) - << "Unknown task " << task->task_id() - << " of framework " << task->framework_id(); + // /master/teardown and /master/shutdown (deprecated). + process::Future<process::http::Response> teardown( + const process::http::Request& request) const; - if (!protobuf::isTerminalState(task->state())) { - totalUsedResources -= task->resources(); - usedResources[task->slave_id()] -= task->resources(); - if (usedResources[task->slave_id()].empty()) { - usedResources.erase(task->slave_id()); - } - } + // /master/slaves + process::Future<process::http::Response> slaves( + const process::http::Request& request) const; - addCompletedTask(*task); + // /master/state.json + process::Future<process::http::Response> state( + const process::http::Request& request) const; - tasks.erase(task->task_id()); - } + // /master/state-summary + process::Future<process::http::Response> stateSummary( + const process::http::Request& request) const; - void addOffer(Offer* offer) - { - CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); - offers.insert(offer); - totalOfferedResources += offer->resources(); - offeredResources[offer->slave_id()] += offer->resources(); - } + // /master/tasks.json + process::Future<process::http::Response> tasks( + const process::http::Request& request) const; - void removeOffer(Offer* offer) - { - CHECK(offers.find(offer) != offers.end()) - << "Unknown offer " << offer->id(); + const static std::string HEALTH_HELP; + const static std::string OBSERVE_HELP; + const static std::string REDIRECT_HELP; + const static std::string SHUTDOWN_HELP; // Deprecated. + const static std::string TEARDOWN_HELP; + const static std::string SLAVES_HELP; + const static std::string TASKS_HELP; - totalOfferedResources -= offer->resources(); - offeredResources[offer->slave_id()] -= offer->resources(); - if (offeredResources[offer->slave_id()].empty()) { - offeredResources.erase(offer->slave_id()); - } + private: + // Helper for doing authentication, returns the credential used if + // the authentication was successful (or none if no credentials + // have been given to the master), otherwise an Error. + Result<Credential> authenticate( + const process::http::Request& request) const; - offers.erase(offer); - } + // Continuations. + process::Future<process::http::Response> _teardown( + const FrameworkID& id, + bool authorized = true) const; - bool hasExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - return executors.contains(slaveId) && - executors[slaveId].contains(executorId); - } + Master* master; + }; - void addExecutor(const SlaveID& slaveId, - const ExecutorInfo& executorInfo) - { - CHECK(!hasExecutor(slaveId, executorInfo.executor_id())) - << "Duplicate executor " << executorInfo.executor_id() - << " on slave " << slaveId; + Master(const Master&); // No copying. + Master& operator = (const Master&); // No assigning. - executors[slaveId][executorInfo.executor_id()] = executorInfo; - totalUsedResources += executorInfo.resources(); - usedResources[slaveId] += executorInfo.resources(); - } + friend struct Metrics; - void removeExecutor(const SlaveID& slaveId, - const ExecutorID& executorId) - { - CHECK(hasExecutor(slaveId, executorId)) - << "Unknown executor " << executorId - << " of framework " << id() - << " of slave " << slaveId; + // NOTE: Since 'getOffer' and 'getSlave' are protected, we need to + // make the following functions friends so that validation functions + // can get Offer* and Slave*. + friend Offer* validation::offer::getOffer( + Master* master, const OfferID& offerId); - totalUsedResources -= executors[slaveId][executorId].resources(); - usedResources[slaveId] -= executors[slaveId][executorId].resources(); - if (usedResources[slaveId].empty()) { - usedResources.erase(slaveId); - } + friend Slave* validation::offer::getSlave( + Master* master, const SlaveID& slaveId); - executors[slaveId].erase(executorId); - if (executors[slaveId].empty()) { - executors.erase(slaveId); - } - } + const Flags flags; - const FrameworkID id() const { return info.id(); } + Option<MasterInfo> leader; // Current leading master. - // Update fields in 'info' using those in 'source'. Currently this - // only updates 'name', 'failover_timeout', 'hostname', and - // 'webui_url'. - void updateFrameworkInfo(const FrameworkInfo& source) - { - // TODO(jmlvanre): We can't check 'FrameworkInfo.id' yet because - // of MESOS-2559. Once this is fixed we can 'CHECK' that we only - // merge 'info' from the same framework 'id'. + mesos::master::allocator::Allocator* allocator; + WhitelistWatcher* whitelistWatcher; + Registrar* registrar; + Repairer* repairer; + Files* files; + + MasterContender* contender; + MasterDetector* detector; + + const Option<Authorizer*> authorizer; + + MasterInfo info_; - // TODO(jmlvanre): Merge other fields as per design doc in - // MESOS-703. + // Indicates when recovery is complete. Recovery begins once the + // master is elected as a leader. + Option<process::Future<Nothing>> recovered; - if (source.user() != info.user()) { - LOG(WARNING) << "Can not update FrameworkInfo.user to '" << info.user() - << "' for framework " << id() << ". Check MESOS-703"; - } + struct Slaves + { + Slaves() : removed(MAX_REMOVED_SLAVES) {} - info.set_name(source.name()); + // Imposes a time limit for slaves that we recover from the + // registry to re-register with the master. + Option<process::Timer> recoveredTimer; - if (source.has_failover_timeout()) { - info.set_failover_timeout(source.failover_timeout()); - } else { - info.clear_failover_timeout(); - } + // Slaves that have been recovered from the registrar but have yet + // to re-register. We keep a "reregistrationTimer" above to ensure + // we remove these slaves if they do not re-register. + hashset<SlaveID> recovered; - if (source.checkpoint() != info.checkpoint()) { - LOG(WARNING) << "Can not update FrameworkInfo.checkpoint to '" - << stringify(info.checkpoint()) << "' for framework " << id() - << ". Check MESOS-703"; - } + // Slaves that are in the process of registering. + hashset<process::UPID> registering; - if (source.role() != info.role()) { - LOG(WARNING) << "Can not update FrameworkInfo.role to '" << info.role() - << "' for framework " << id() << ". Check MESOS-703"; - } + // Only those slaves that are re-registering for the first time + // with this master. We must not answer questions related to + // these slaves until the registrar determines their fate. + hashset<SlaveID> reregistering; - if (source.has_hostname()) { - info.set_hostname(source.hostname()); - } else { - info.clear_hostname(); - } + hashmap<SlaveID, Slave*> registered; - if (source.principal() != info.principal()) { - LOG(WARNING) << "Can not update FrameworkInfo.principal to '" - << info.principal() << "' for framework " << id() - << ". Check MESOS-703"; - } + // Slaves that are in the process of being removed from the + // registrar. Think of these as being partially removed: we must + // not answer questions related to these until they are removed + // from the registry. + hashset<SlaveID> removing; - if (source.has_webui_url()) { - info.set_webui_url(source.webui_url()); - } else { - info.clear_webui_url(); - } - } + // We track removed slaves to preserve the consistency + // semantics of the pre-registrar code when a non-strict registrar + // is being used. That is, if we remove a slave, we must make + // an effort to prevent it from (re-)registering, sending updates, + // etc. We keep a cache here to prevent this from growing in an + // unbounded manner. + // TODO(bmahler): Ideally we could use a cache with set semantics. + Cache<SlaveID, Nothing> removed; - FrameworkInfo info; + // This rate limiter is used to limit the removal of slaves failing + // health checks. + // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is + // a wrapper around libprocess process which is thread safe. + Option<std::shared_ptr<process::RateLimiter>> limiter; - process::UPID pid; + bool transitioning(const Option<SlaveID>& slaveId) + { + if (slaveId.isSome()) { + return recovered.contains(slaveId.get()) || + reregistering.contains(slaveId.get()) || + removing.contains(slaveId.get()); + } else { + return !recovered.empty() || + !reregistering.empty() || + !removing.empty(); + } + } + } slaves; - // Framework becomes disconnected when the socket closes. - bool connected; + struct Frameworks + { + Frameworks() : completed(MAX_COMPLETED_FRAMEWORKS) {} - // Framework becomes deactivated when it is disconnected or - // the master receives a DeactivateFrameworkMessage. - // No offers will be made to a deactivated framework. - bool active; + hashmap<FrameworkID, Framework*> registered; + boost::circular_buffer<std::shared_ptr<Framework>> completed; - process::Time registeredTime; - process::Time reregisteredTime; - process::Time unregisteredTime; + // Principals of frameworks keyed by PID. + // NOTE: Multiple PIDs can map to the same principal. The + // principal is None when the framework doesn't specify it. + // The differences between this map and 'authenticated' are: + // 1) This map only includes *registered* frameworks. The mapping + // is added when a framework (re-)registers. + // 2) This map includes unauthenticated frameworks (when Master + // allows them) if they have principals specified in + // FrameworkInfo. + hashmap<process::UPID, Option<std::string>> principals; - // Tasks that have not yet been launched because they are currently - // being authorized. - hashmap<TaskID, TaskInfo> pendingTasks; + // BoundedRateLimiters keyed by the framework principal. + // Like Metrics::Frameworks, all frameworks of the same principal + // are throttled together at a common rate limit. + hashmap<std::string, Option<process::Owned<BoundedRateLimiter>>> limiters; - hashmap<TaskID, Task*> tasks; + // The default limiter is for frameworks not specified in + // 'flags.rate_limits'. + Option<process::Owned<BoundedRateLimiter>> defaultLimiter; + } frameworks; - // NOTE: We use a shared pointer for Task because clang doesn't like - // Boost's implementation of circular_buffer with Task (Boost - // attempts to do some memset's which are unsafe). - boost::circular_buffer<std::shared_ptr<Task>> completedTasks; + hashmap<OfferID, Offer*> offers; + hashmap<OfferID, process::Timer> offerTimers; - hashset<Offer*> offers; // Active offers for framework. + hashmap<std::string, Role*> roles; - hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors; + // Authenticator names as supplied via flags. + std::vector<std::string> authenticatorNames; - // NOTE: For the used and offered resources below, we keep the - // total as well as partitioned by SlaveID. - // We expose the total resources via the HTTP endpoint, and we - // keep a running total of the resources because looping over the - // slaves to sum the resources has led to perf issues (MESOS-1862). - // We keep the resources partitioned by SlaveID because non-scalar - // resources can be lost when summing them up across multiple - // slaves (MESOS-2373). - // - // Also note that keeping the totals is safe even though it yields - // incorrect results for non-scalar resources. - // (1) For overlapping set items / ranges across slaves, these - // will get added N times but only represented once. - // (2) When an initial subtraction occurs (N-1), the resource is - // no longer represented. (This is the source of the bug). - // (3) When any further subtractions occur (N-(1+M)), the - // Resources simply ignores the subtraction since there's - // nothing to remove, so this is safe for now. + Option<Authenticator*> authenticator; - // TODO(mpark): Strip the non-scalar resources out of the totals - // in order to avoid reporting incorrect statistics (MESOS-2623). + // Frameworks/slaves that are currently in the process of authentication. + // 'authenticating' future is completed when authenticator + // completes authentication. + // The future is removed from the map when master completes authentication. + hashmap<process::UPID, process::Future<Option<std::string>>> authenticating; - // Active task / executor resources. - Resources totalUsedResources; - hashmap<SlaveID, Resources> usedResources; + // Principals of authenticated frameworks/slaves keyed by PID. + hashmap<process::UPID, std::string> authenticated; - // Offered resources. - Resources totalOfferedResources; - hashmap<SlaveID, Resources> offeredResources; + int64_t nextFrameworkId; // Used to give each framework a unique ID. + int64_t nextOfferId; // Used to give each slot offer a unique ID. + int64_t nextSlaveId; // Used to give each slave a unique ID. -private: - Framework(const Framework&); // No copying. - Framework& operator = (const Framework&); // No assigning. -}; + // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is + // thread safe. + // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains + // copyable metric types only. + std::shared_ptr<Metrics> metrics; + // Gauge handlers. + double _uptime_secs() + { + return (process::Clock::now() - startTime).secs(); + } -inline std::ostream& operator << ( - std::ostream& stream, - const Framework& framework) -{ - // TODO(vinod): Also log the hostname once FrameworkInfo is properly - // updated on framework failover (MESOS-1784). - return stream << framework.id() << " (" << framework.info.name() - << ") at " << framework.pid; -} + double _elected() + { + return elected() ? 1 : 0; + } + double _slaves_connected(); + double _slaves_disconnected(); + double _slaves_active(); + double _slaves_inactive(); -// Information about an active role. -struct Role -{ - explicit Role(const mesos::master::RoleInfo& _info) - : info(_info) {} + double _frameworks_connected(); + double _frameworks_disconnected(); + double _frameworks_active(); + double _frameworks_inactive(); - void addFramework(Framework* framework) + double _outstanding_offers() { - frameworks[framework->id()] = framework; + return offers.size(); } - void removeFramework(Framework* framework) + double _event_queue_messages() { - frameworks.erase(framework->id()); + return static_cast<double>(eventCount<process::MessageEvent>()); } - Resources resources() const + double _event_queue_dispatches() { - Resources resources; - foreachvalue (Framework* framework, frameworks) { - resources += framework->totalUsedResources; - resources += framework->totalOfferedResources; - } + return static_cast<double>(eventCount<process::DispatchEvent>()); + } - return resources; + double _event_queue_http_requests() + { + return static_cast<double>(eventCount<process::HttpEvent>()); } - mesos::master::RoleInfo info; + double _tasks_staging(); + double _tasks_starting(); + double _tasks_running(); - hashmap<FrameworkID, Framework*> frameworks; + double _resources_total(const std::string& name); + double _resources_used(const std::string& name); + double _resources_percent(const std::string& name); + + process::Time startTime; // Start time used to calculate uptime. + + Option<process::Time> electedTime; // Time when this master is elected. + + // Validates the framework including authorization. + // Returns None if the framework is valid. + // Returns Error if the framework is invalid. + // Returns Failure if authorization returns 'Failure'. + process::Future<Option<Error>> validate( + const FrameworkInfo& frameworkInfo, + const process::UPID& from); };
