This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push: new aefa4bd Introduced dedicated `Framework` methods for transitions between states. aefa4bd is described below commit aefa4bdb8a111c74f1d0f7e851a486f4e2ec47e7 Author: Andrei Sekretenko <asekrete...@mesosphere.com> AuthorDate: Tue Jan 21 19:55:24 2020 +0100 Introduced dedicated `Framework` methods for transitions between states. The main purpose of this patch is gathering scattered logic of transitioning `Framework` to disconnected state into `Framework::disconnect()` method. This is a prerequisite for adding to the `Framework` state one more entity that needs cleanup when the framework is disconnected (namely, adding per-framework `ObjectApprovers` in depending patches). Additionally, this patch decouples connection state from eligibility to receive offers: `ACTIVE` and `INACTIVE` states are merged into `CONNECTED`, and a new boolean attribute `active` is introduced. Now that `updateConnection(...)` does not change `active` on its own, methods `activate()` and `deactivate()` are introduced. Note that the current behaviour of activating reconnected framework regardless of whether it was active before disconnecting is not changed. Also, for consistency between `CONNECTED`->`DISCONNECTED` transition and other state transitions, public `setFrameworkState(...)` method is removed. Review: https://reviews.apache.org/r/72095 --- src/master/framework.cpp | 81 +++++++++++++++++++++++++---------------- src/master/master.cpp | 67 ++++++++++++++++++---------------- src/master/master.hpp | 87 +++++++++++++++++++++----------------------- src/master/quota_handler.cpp | 2 +- 4 files changed, 129 insertions(+), 108 deletions(-) diff --git a/src/master/framework.cpp b/src/master/framework.cpp index a9318a9..85d9951 100644 --- a/src/master/framework.cpp +++ b/src/master/framework.cpp @@ -29,7 +29,7 @@ Framework::Framework( const FrameworkInfo& info, const process::UPID& pid, const process::Time& time) - : Framework(master, masterFlags, info, ACTIVE, time) + : Framework(master, masterFlags, info, CONNECTED, true, time) { pid_ = pid; } @@ -41,7 +41,7 @@ Framework::Framework( const FrameworkInfo& info, const StreamingHttpConnection<v1::scheduler::Event>& http, const process::Time& time) - : Framework(master, masterFlags, info, ACTIVE, time) + : Framework(master, masterFlags, info, CONNECTED, true, time) { http_ = http; } @@ -51,7 +51,7 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info) - : Framework(master, masterFlags, info, RECOVERED, process::Time()) + : Framework(master, masterFlags, info, RECOVERED, false, process::Time()) {} @@ -60,21 +60,23 @@ Framework::Framework( const Flags& masterFlags, const FrameworkInfo& _info, State state, + bool active_, const process::Time& time) : master(_master), info(_info), roles(protobuf::framework::getRoles(_info)), capabilities(_info.capabilities()), - state(state), registeredTime(time), reregisteredTime(time), completedTasks(masterFlags.max_completed_tasks_per_framework), unreachableTasks(masterFlags.max_unreachable_tasks_per_framework), - metrics(_info, masterFlags.publish_per_framework_metrics) + metrics(_info, masterFlags.publish_per_framework_metrics), + active_(active_), + state(state) { CHECK(_info.has_id()); - setFrameworkState(state); + setState(state); foreach (const std::string& role, roles) { // NOTE: It's possible that we're already being tracked under the role @@ -89,9 +91,7 @@ Framework::Framework( Framework::~Framework() { - if (http_.isSome()) { - closeHttpConnection(); - } + disconnect(); } @@ -561,48 +561,67 @@ void Framework::update(const FrameworkInfo& newInfo) void Framework::updateConnection(const process::UPID& newPid) { - // Cleanup the HTTP connnection if this is a downgrade from HTTP - // to PID. Note that the connection may already be closed. - if (http_.isSome()) { - closeHttpConnection(); - } + // Cleanup the old connection state if exists. + disconnect(); + CHECK_NONE(http_); // TODO(benh): unlink(oldPid); pid_ = newPid; + setState(State::CONNECTED); } void Framework::updateConnection( const StreamingHttpConnection<v1::scheduler::Event>& newHttp) { - if (pid_.isSome()) { - // Wipe the PID if this is an upgrade from PID to HTTP. - // TODO(benh): unlink(oldPid); - pid_ = None(); - } else if (http_.isSome()) { - // Cleanup the old HTTP connection. - // Note that master creates a new HTTP connection for every - // subscribe request, so 'newHttp' should always be different - // from 'http'. - closeHttpConnection(); - } + // Note that master creates a new HTTP connection for every + // subscribe request, so 'newHttp' should always be different + // from 'http'. + CHECK(http_.isNone() || newHttp.writer != http_->writer); - CHECK_NONE(http_); + // Cleanup the old connection state if exists. + disconnect(); + // TODO(benh): unlink(oldPid) if this is an upgrade from PID to HTTP. + pid_ = None(); + + CHECK_NONE(http_); http_ = newHttp; + setState(State::CONNECTED); } -void Framework::closeHttpConnection() +bool Framework::activate() { - CHECK_SOME(http_); + bool noop = active_; + active_ = true; + return !noop; +} + + +bool Framework::deactivate() +{ + bool noop = !active_; + active_ = false; + return !noop; +} + + +bool Framework::disconnect() +{ + if (state != State::CONNECTED) { + CHECK(http_.isNone()); + return false; + } - if (connected() && !http_->close()) { + if (http_.isSome() && connected() && !http_->close()) { LOG(WARNING) << "Failed to close HTTP pipe for " << *this; } http_ = None(); heartbeater.reset(); + setState(State::DISCONNECTED); + return true; } @@ -678,10 +697,10 @@ void Framework::untrackUnderRole(const std::string& role) } -void Framework::setFrameworkState(const Framework::State& _state) +void Framework::setState(Framework::State _state) { state = _state; - metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0; + metrics.subscribed = state == Framework::State::CONNECTED ? 1 : 0; } } // namespace master { diff --git a/src/master/master.cpp b/src/master/master.cpp index 36a81cc..3c621e4 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2801,7 +2801,7 @@ void Master::_subscribe( failoverFramework(framework, http); } else { // The framework has not yet reregistered after master failover. - activateRecoveredFramework( + connectAndActivateRecoveredFramework( framework, frameworkInfo, None(), http, suppressedRoles); } @@ -3130,11 +3130,12 @@ void Master::_subscribe( // framework link previously broke. link(framework->pid().get()); - // Reactivate the framework. - // NOTE: We do this after recovering resources (above) so that - // the allocator has the correct view of the framework's share. - if (!framework->active()) { - framework->setFrameworkState(Framework::State::ACTIVE); + framework->updateConnection(*(framework->pid())); + if (framework->activate()) { + // The framework was not active and needs to be activated in allocator. + // + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. allocator->activateFramework(framework->id()); } @@ -3145,7 +3146,7 @@ void Master::_subscribe( } } else { // The framework has not yet reregistered after master failover. - activateRecoveredFramework( + connectAndActivateRecoveredFramework( framework, frameworkInfo, from, None(), suppressedRoles); } @@ -3310,26 +3311,23 @@ void Master::disconnect(Framework* framework) LOG(INFO) << "Disconnecting framework " << *framework; - framework->setFrameworkState(Framework::State::DISCONNECTED); - if (framework->pid().isSome()) { // Remove the framework from authenticated. This is safe because // a framework will always reauthenticate before (re-)registering. authenticated.erase(framework->pid().get()); - } else { - framework->closeHttpConnection(); } + + CHECK(framework->disconnect()); } void Master::deactivate(Framework* framework, bool rescind) { CHECK_NOTNULL(framework); - CHECK(framework->active()); LOG(INFO) << "Deactivating framework " << *framework; - framework->setFrameworkState(Framework::State::INACTIVE); + CHECK(framework->deactivate()); // Tell the allocator to stop allocating resources to this framework. allocator->deactivateFramework(framework->id()); @@ -9779,10 +9777,12 @@ void Master::offer( { Framework* framework = getFramework(frameworkId); - if (framework == nullptr || !framework->active()) { + if (framework == nullptr || + !framework->connected() || + !framework->active()) { LOG(WARNING) << "Master returning resources offered to framework " << frameworkId << " because the framework" - << " has terminated or is inactive"; + << " has terminated, is not connected, or is inactive"; foreachkey (const string& role, resources) { foreachpair (const SlaveID& slaveId, @@ -9989,18 +9989,24 @@ void Master::inverseOffer( const FrameworkID& frameworkId, const hashmap<SlaveID, UnavailableResources>& resources) { - if (!frameworks.registered.contains(frameworkId) || - !frameworks.registered[frameworkId]->active()) { + if (!frameworks.registered.contains(frameworkId)) { LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId - << " because the framework has terminated or is inactive"; + << " because the framework has terminated"; + return; + } + + Framework* framework = CHECK_NOTNULL(frameworks.registered.at(frameworkId)); + if (!framework->connected() || !framework->active()) { + LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId + << " because the framework is " + << (framework->active() ? "not connected" : "inactive"); return; } + // Create an inverse offer for each slave and add it to the message. InverseOffersMessage message; - - Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]); foreachpair (const SlaveID& slaveId, const UnavailableResources& unavailableResources, resources) { @@ -10552,7 +10558,7 @@ void Master::recoverFramework( } -void Master::activateRecoveredFramework( +void Master::connectAndActivateRecoveredFramework( Framework* framework, const FrameworkInfo& frameworkInfo, const Option<UPID>& pid, @@ -10589,8 +10595,9 @@ void Master::activateRecoveredFramework( .onAny(defer(self(), &Self::exited, framework->id(), http.get())); } - // Activate the framework. - framework->setFrameworkState(Framework::State::ACTIVE); + CHECK(framework->activate()) + << "RECOVERED framework is expected not to be active"; + allocator->activateFramework(framework->id()); // Export framework metrics if a principal is specified in `FrameworkInfo`. @@ -10733,11 +10740,11 @@ void Master::_failoverFramework(Framework* framework) CHECK(!framework->recovered()); - // Reactivate the framework, if needed. - // NOTE: We do this after recovering resources (above) so that - // the allocator has the correct view of the framework's share. - if (!framework->active()) { - framework->setFrameworkState(Framework::State::ACTIVE); + if (framework->activate()) { + // The framework was inactive and needs to be activated in the allocator. + // + // NOTE: We do this after recovering resources (above) so that + // the allocator has the correct view of the framework's share. allocator->activateFramework(framework->id()); } @@ -10912,9 +10919,7 @@ void Master::removeFramework(Framework* framework) // TODO(benh): unlink(framework->pid); - if (framework->http().isSome()) { - framework->closeHttpConnection(); - } + framework->disconnect(); framework->unregisteredTime = Clock::now(); diff --git a/src/master/master.hpp b/src/master/master.hpp index d774d77..f3239cd 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -635,7 +635,7 @@ protected: // activate it. This happens at most once after master failover, the // first time that the framework reregisters with the new master. // Exactly one of `newPid` or `http` must be provided. - void activateRecoveredFramework( + void connectAndActivateRecoveredFramework( Framework* framework, const FrameworkInfo& frameworkInfo, const Option<process::UPID>& pid, @@ -2427,23 +2427,21 @@ struct Framework // agents that are running tasks for the framework. RECOVERED, - // Framework was previously connected to this master. A framework - // becomes disconnected when there is a socket error. - DISCONNECTED, + // The framework is connected. The framework may or may not be eligible to + // receive offers; this property is tracked separately. + CONNECTED, - // The framework is connected but not active. - INACTIVE, - - // Framework is connected and eligible to receive offers. No - // offers will be made to frameworks that are not active. - ACTIVE + // Framework was previously connected to this master, + // but is not connected now. + DISCONNECTED }; - Framework(Master* const master, - const Flags& masterFlags, - const FrameworkInfo& info, - const process::UPID& _pid, - const process::Time& time = process::Clock::now()); + Framework( + Master* const master, + const Flags& masterFlags, + const FrameworkInfo& info, + const process::UPID& _pid, + const process::Time& time = process::Clock::now()); Framework(Master* const master, const Flags& masterFlags, @@ -2515,29 +2513,34 @@ struct Framework // 'webui_url', 'capabilities', and 'labels'. void update(const FrameworkInfo& newInfo); + // Reactivate framework with new connection: update connection-related state + // and mark the framework as CONNECTED, regardless of the previous state. void updateConnection(const process::UPID& newPid); - void updateConnection( const StreamingHttpConnection<v1::scheduler::Event>& newHttp); - // Closes the HTTP connection and stops the heartbeat. - // - // TODO(vinod): Currently `state` variable is set separately - // from this method. We need to make sure these are in sync. - void closeHttpConnection(); + // If the framework is CONNECTED, clear all state associated with + // the scheduler being connected (close http connection, stop heartbeater, + // etc.), mark the framework DISCONNECTED and return `true`. + // Otherwise, return `false`. + bool disconnect(); + + // Mark the framework as active (eligible to receive offers if connected) + // or inactive. Returns true if this property changed, false otherwise. + bool activate(); + bool deactivate(); void heartbeat(); - bool active() const; - bool connected() const; - bool recovered() const; + bool active() const { return active_; } + + bool connected() const {return state == State::CONNECTED;} + bool recovered() const {return state == State::RECOVERED;} bool isTrackedUnderRole(const std::string& role) const; void trackUnderRole(const std::string& role); void untrackUnderRole(const std::string& role); - void setFrameworkState(const State& _state); - const Option<StreamingHttpConnection<v1::scheduler::Event>>& http() const { return http_; @@ -2553,8 +2556,6 @@ struct Framework protobuf::framework::Capabilities capabilities; - State state; - process::Time registeredTime; process::Time reregisteredTime; process::Time unregisteredTime; @@ -2638,11 +2639,24 @@ private: const Flags& masterFlags, const FrameworkInfo& _info, State state, + bool active, const process::Time& time); Framework(const Framework&); // No copying. Framework& operator=(const Framework&); // No assigning. + // Indicates whether this framework should be receiving offers + // when it is connected. + bool active_; + + // NOTE: `state` should never modified by means other than `setState()`. + // + // TODO(asekretenko): Encapsulate `state` to ensure that `metrics.subscribed` + // is updated together with any `state` change. + State state; + + void setState(State state_); + // Frameworks can either be connected via HTTP or by message passing // (scheduler driver). At most one of `http` and `pid` will be set // according to the last connection made by the framework; neither @@ -2701,23 +2715,6 @@ inline const FrameworkID Framework::id() const } -inline bool Framework::active() const -{ - return state == ACTIVE; -} - - -inline bool Framework::connected() const -{ - return state == ACTIVE || state == INACTIVE; -} - - -inline bool Framework::recovered() const -{ - return state == RECOVERED; -} - inline std::ostream& operator<<( std::ostream& stream, diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp index ea3f858..e7f3881 100644 --- a/src/master/quota_handler.cpp +++ b/src/master/quota_handler.cpp @@ -247,7 +247,7 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& request) const if (master->roles.contains(role)) { Role* roleState = master->roles.at(role); foreachvalue (const Framework* framework, roleState->frameworks) { - if (framework->active()) { + if (framework->connected() && framework->active()) { ++frameworksInRole; } }