This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new aefa4bd  Introduced dedicated `Framework` methods for transitions 
between states.
aefa4bd is described below

commit aefa4bdb8a111c74f1d0f7e851a486f4e2ec47e7
Author: Andrei Sekretenko <asekrete...@mesosphere.com>
AuthorDate: Tue Jan 21 19:55:24 2020 +0100

    Introduced dedicated `Framework` methods for transitions between states.
    
    The main purpose of this patch is gathering scattered logic of
    transitioning `Framework` to disconnected state into
    `Framework::disconnect()` method. This is a prerequisite for adding
    to the `Framework` state one more entity that needs cleanup when the
    framework is disconnected (namely, adding per-framework
    `ObjectApprovers` in depending patches).
    
    Additionally, this patch decouples connection state from eligibility
    to receive offers: `ACTIVE` and `INACTIVE` states are merged into
    `CONNECTED`, and a new boolean attribute `active` is introduced.
    Now that `updateConnection(...)` does not change `active` on its own,
    methods `activate()` and `deactivate()` are introduced.
    
    Note that the current behaviour of activating reconnected framework
    regardless of whether it was active before disconnecting is not changed.
    
    Also, for consistency between `CONNECTED`->`DISCONNECTED` transition
    and other state transitions, public `setFrameworkState(...)` method
    is removed.
    
    Review: https://reviews.apache.org/r/72095
---
 src/master/framework.cpp     | 81 +++++++++++++++++++++++++----------------
 src/master/master.cpp        | 67 ++++++++++++++++++----------------
 src/master/master.hpp        | 87 +++++++++++++++++++++-----------------------
 src/master/quota_handler.cpp |  2 +-
 4 files changed, 129 insertions(+), 108 deletions(-)

diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index a9318a9..85d9951 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -29,7 +29,7 @@ Framework::Framework(
     const FrameworkInfo& info,
     const process::UPID& pid,
     const process::Time& time)
-  : Framework(master, masterFlags, info, ACTIVE, time)
+  : Framework(master, masterFlags, info, CONNECTED, true, time)
 {
   pid_ = pid;
 }
@@ -41,7 +41,7 @@ Framework::Framework(
     const FrameworkInfo& info,
     const StreamingHttpConnection<v1::scheduler::Event>& http,
     const process::Time& time)
-  : Framework(master, masterFlags, info, ACTIVE, time)
+  : Framework(master, masterFlags, info, CONNECTED, true, time)
 {
   http_ = http;
 }
@@ -51,7 +51,7 @@ Framework::Framework(
     Master* const master,
     const Flags& masterFlags,
     const FrameworkInfo& info)
-  : Framework(master, masterFlags, info, RECOVERED, process::Time())
+  : Framework(master, masterFlags, info, RECOVERED, false, process::Time())
 {}
 
 
@@ -60,21 +60,23 @@ Framework::Framework(
     const Flags& masterFlags,
     const FrameworkInfo& _info,
     State state,
+    bool active_,
     const process::Time& time)
   : master(_master),
     info(_info),
     roles(protobuf::framework::getRoles(_info)),
     capabilities(_info.capabilities()),
-    state(state),
     registeredTime(time),
     reregisteredTime(time),
     completedTasks(masterFlags.max_completed_tasks_per_framework),
     unreachableTasks(masterFlags.max_unreachable_tasks_per_framework),
-    metrics(_info, masterFlags.publish_per_framework_metrics)
+    metrics(_info, masterFlags.publish_per_framework_metrics),
+    active_(active_),
+    state(state)
 {
   CHECK(_info.has_id());
 
-  setFrameworkState(state);
+  setState(state);
 
   foreach (const std::string& role, roles) {
     // NOTE: It's possible that we're already being tracked under the role
@@ -89,9 +91,7 @@ Framework::Framework(
 
 Framework::~Framework()
 {
-  if (http_.isSome()) {
-    closeHttpConnection();
-  }
+  disconnect();
 }
 
 
@@ -561,48 +561,67 @@ void Framework::update(const FrameworkInfo& newInfo)
 
 void Framework::updateConnection(const process::UPID& newPid)
 {
-  // Cleanup the HTTP connnection if this is a downgrade from HTTP
-  // to PID. Note that the connection may already be closed.
-  if (http_.isSome()) {
-    closeHttpConnection();
-  }
+  // Cleanup the old connection state if exists.
+  disconnect();
+  CHECK_NONE(http_);
 
   // TODO(benh): unlink(oldPid);
   pid_ = newPid;
+  setState(State::CONNECTED);
 }
 
 
 void Framework::updateConnection(
     const StreamingHttpConnection<v1::scheduler::Event>& newHttp)
 {
-  if (pid_.isSome()) {
-    // Wipe the PID if this is an upgrade from PID to HTTP.
-    // TODO(benh): unlink(oldPid);
-    pid_ = None();
-  } else if (http_.isSome()) {
-    // Cleanup the old HTTP connection.
-    // Note that master creates a new HTTP connection for every
-    // subscribe request, so 'newHttp' should always be different
-    // from 'http'.
-    closeHttpConnection();
-  }
+  // Note that master creates a new HTTP connection for every
+  // subscribe request, so 'newHttp' should always be different
+  // from 'http'.
+  CHECK(http_.isNone() || newHttp.writer != http_->writer);
 
-  CHECK_NONE(http_);
+  // Cleanup the old connection state if exists.
+  disconnect();
 
+  // TODO(benh): unlink(oldPid) if this is an upgrade from PID to HTTP.
+  pid_ = None();
+
+  CHECK_NONE(http_);
   http_ = newHttp;
+  setState(State::CONNECTED);
 }
 
 
-void Framework::closeHttpConnection()
+bool Framework::activate()
 {
-  CHECK_SOME(http_);
+  bool noop = active_;
+  active_ = true;
+  return !noop;
+}
+
+
+bool Framework::deactivate()
+{
+  bool noop = !active_;
+  active_ = false;
+  return !noop;
+}
+
+
+bool Framework::disconnect()
+{
+  if (state != State::CONNECTED) {
+    CHECK(http_.isNone());
+    return false;
+  }
 
-  if (connected() && !http_->close()) {
+  if (http_.isSome() && connected() && !http_->close()) {
     LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
   }
 
   http_ = None();
   heartbeater.reset();
+  setState(State::DISCONNECTED);
+  return true;
 }
 
 
@@ -678,10 +697,10 @@ void Framework::untrackUnderRole(const std::string& role)
 }
 
 
-void Framework::setFrameworkState(const Framework::State& _state)
+void Framework::setState(Framework::State _state)
 {
   state = _state;
-  metrics.subscribed = state == Framework::State::ACTIVE ? 1 : 0;
+  metrics.subscribed = state == Framework::State::CONNECTED ? 1 : 0;
 }
 
 } // namespace master {
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 36a81cc..3c621e4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2801,7 +2801,7 @@ void Master::_subscribe(
     failoverFramework(framework, http);
   } else {
     // The framework has not yet reregistered after master failover.
-    activateRecoveredFramework(
+    connectAndActivateRecoveredFramework(
         framework, frameworkInfo, None(), http, suppressedRoles);
   }
 
@@ -3130,11 +3130,12 @@ void Master::_subscribe(
       // framework link previously broke.
       link(framework->pid().get());
 
-      // Reactivate the framework.
-      // NOTE: We do this after recovering resources (above) so that
-      // the allocator has the correct view of the framework's share.
-      if (!framework->active()) {
-        framework->setFrameworkState(Framework::State::ACTIVE);
+      framework->updateConnection(*(framework->pid()));
+      if (framework->activate()) {
+        // The framework was not active and needs to be activated in allocator.
+        //
+        // NOTE: We do this after recovering resources (above) so that
+        // the allocator has the correct view of the framework's share.
         allocator->activateFramework(framework->id());
       }
 
@@ -3145,7 +3146,7 @@ void Master::_subscribe(
     }
   } else {
     // The framework has not yet reregistered after master failover.
-    activateRecoveredFramework(
+    connectAndActivateRecoveredFramework(
         framework, frameworkInfo, from, None(), suppressedRoles);
   }
 
@@ -3310,26 +3311,23 @@ void Master::disconnect(Framework* framework)
 
   LOG(INFO) << "Disconnecting framework " << *framework;
 
-  framework->setFrameworkState(Framework::State::DISCONNECTED);
-
   if (framework->pid().isSome()) {
     // Remove the framework from authenticated. This is safe because
     // a framework will always reauthenticate before (re-)registering.
     authenticated.erase(framework->pid().get());
-  } else {
-    framework->closeHttpConnection();
   }
+
+  CHECK(framework->disconnect());
 }
 
 
 void Master::deactivate(Framework* framework, bool rescind)
 {
   CHECK_NOTNULL(framework);
-  CHECK(framework->active());
 
   LOG(INFO) << "Deactivating framework " << *framework;
 
-  framework->setFrameworkState(Framework::State::INACTIVE);
+  CHECK(framework->deactivate());
 
   // Tell the allocator to stop allocating resources to this framework.
   allocator->deactivateFramework(framework->id());
@@ -9779,10 +9777,12 @@ void Master::offer(
 {
   Framework* framework = getFramework(frameworkId);
 
-  if (framework == nullptr || !framework->active()) {
+  if (framework == nullptr ||
+      !framework->connected() ||
+      !framework->active()) {
     LOG(WARNING) << "Master returning resources offered to framework "
                  << frameworkId << " because the framework"
-                 << " has terminated or is inactive";
+                 << " has terminated, is not connected, or is inactive";
 
     foreachkey (const string& role, resources) {
       foreachpair (const SlaveID& slaveId,
@@ -9989,18 +9989,24 @@ void Master::inverseOffer(
     const FrameworkID& frameworkId,
     const hashmap<SlaveID, UnavailableResources>& resources)
 {
-  if (!frameworks.registered.contains(frameworkId) ||
-      !frameworks.registered[frameworkId]->active()) {
+  if (!frameworks.registered.contains(frameworkId)) {
     LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId
-              << " because the framework has terminated or is inactive";
+              << " because the framework has terminated";
+    return;
+  }
+
+  Framework* framework = CHECK_NOTNULL(frameworks.registered.at(frameworkId));
 
+  if (!framework->connected() || !framework->active()) {
+    LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId
+              << " because the framework is "
+              << (framework->active() ? "not connected" : "inactive");
     return;
   }
 
+
   // Create an inverse offer for each slave and add it to the message.
   InverseOffersMessage message;
-
-  Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
   foreachpair (const SlaveID& slaveId,
                const UnavailableResources& unavailableResources,
                resources) {
@@ -10552,7 +10558,7 @@ void Master::recoverFramework(
 }
 
 
-void Master::activateRecoveredFramework(
+void Master::connectAndActivateRecoveredFramework(
     Framework* framework,
     const FrameworkInfo& frameworkInfo,
     const Option<UPID>& pid,
@@ -10589,8 +10595,9 @@ void Master::activateRecoveredFramework(
       .onAny(defer(self(), &Self::exited, framework->id(), http.get()));
   }
 
-  // Activate the framework.
-  framework->setFrameworkState(Framework::State::ACTIVE);
+  CHECK(framework->activate())
+    << "RECOVERED framework is expected not to be active";
+
   allocator->activateFramework(framework->id());
 
   // Export framework metrics if a principal is specified in `FrameworkInfo`.
@@ -10733,11 +10740,11 @@ void Master::_failoverFramework(Framework* framework)
 
   CHECK(!framework->recovered());
 
-  // Reactivate the framework, if needed.
-  // NOTE: We do this after recovering resources (above) so that
-  // the allocator has the correct view of the framework's share.
-  if (!framework->active()) {
-    framework->setFrameworkState(Framework::State::ACTIVE);
+  if (framework->activate()) {
+    // The framework was inactive and needs to be activated in the allocator.
+    //
+    // NOTE: We do this after recovering resources (above) so that
+    // the allocator has the correct view of the framework's share.
     allocator->activateFramework(framework->id());
   }
 
@@ -10912,9 +10919,7 @@ void Master::removeFramework(Framework* framework)
 
   // TODO(benh): unlink(framework->pid);
 
-  if (framework->http().isSome()) {
-    framework->closeHttpConnection();
-  }
+  framework->disconnect();
 
   framework->unregisteredTime = Clock::now();
 
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d774d77..f3239cd 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -635,7 +635,7 @@ protected:
   // activate it. This happens at most once after master failover, the
   // first time that the framework reregisters with the new master.
   // Exactly one of `newPid` or `http` must be provided.
-  void activateRecoveredFramework(
+  void connectAndActivateRecoveredFramework(
       Framework* framework,
       const FrameworkInfo& frameworkInfo,
       const Option<process::UPID>& pid,
@@ -2427,23 +2427,21 @@ struct Framework
     // agents that are running tasks for the framework.
     RECOVERED,
 
-    // Framework was previously connected to this master. A framework
-    // becomes disconnected when there is a socket error.
-    DISCONNECTED,
+    // The framework is connected. The framework may or may not be eligible to
+    // receive offers; this property is tracked separately.
+    CONNECTED,
 
-    // The framework is connected but not active.
-    INACTIVE,
-
-    // Framework is connected and eligible to receive offers. No
-    // offers will be made to frameworks that are not active.
-    ACTIVE
+    // Framework was previously connected to this master,
+    // but is not connected now.
+    DISCONNECTED
   };
 
-  Framework(Master* const master,
-            const Flags& masterFlags,
-            const FrameworkInfo& info,
-            const process::UPID& _pid,
-            const process::Time& time = process::Clock::now());
+  Framework(
+      Master* const master,
+      const Flags& masterFlags,
+      const FrameworkInfo& info,
+      const process::UPID& _pid,
+      const process::Time& time = process::Clock::now());
 
   Framework(Master* const master,
             const Flags& masterFlags,
@@ -2515,29 +2513,34 @@ struct Framework
   // 'webui_url', 'capabilities', and 'labels'.
   void update(const FrameworkInfo& newInfo);
 
+  // Reactivate framework with new connection: update connection-related state
+  // and mark the framework as CONNECTED, regardless of the previous state.
   void updateConnection(const process::UPID& newPid);
-
   void updateConnection(
       const StreamingHttpConnection<v1::scheduler::Event>& newHttp);
 
-  // Closes the HTTP connection and stops the heartbeat.
-  //
-  // TODO(vinod): Currently `state` variable is set separately
-  // from this method. We need to make sure these are in sync.
-  void closeHttpConnection();
+  // If the framework is CONNECTED, clear all state associated with
+  // the scheduler being connected (close http connection, stop heartbeater,
+  // etc.), mark the framework DISCONNECTED and return `true`.
+  // Otherwise, return `false`.
+  bool disconnect();
+
+  // Mark the framework as active (eligible to receive offers if connected)
+  // or inactive. Returns true if this property changed, false otherwise.
+  bool activate();
+  bool deactivate();
 
   void heartbeat();
 
-  bool active() const;
-  bool connected() const;
-  bool recovered() const;
+  bool active() const { return active_; }
+
+  bool connected() const {return state == State::CONNECTED;}
+  bool recovered() const {return state == State::RECOVERED;}
 
   bool isTrackedUnderRole(const std::string& role) const;
   void trackUnderRole(const std::string& role);
   void untrackUnderRole(const std::string& role);
 
-  void setFrameworkState(const State& _state);
-
   const Option<StreamingHttpConnection<v1::scheduler::Event>>& http() const
   {
     return http_;
@@ -2553,8 +2556,6 @@ struct Framework
 
   protobuf::framework::Capabilities capabilities;
 
-  State state;
-
   process::Time registeredTime;
   process::Time reregisteredTime;
   process::Time unregisteredTime;
@@ -2638,11 +2639,24 @@ private:
             const Flags& masterFlags,
             const FrameworkInfo& _info,
             State state,
+            bool active,
             const process::Time& time);
 
   Framework(const Framework&);              // No copying.
   Framework& operator=(const Framework&); // No assigning.
 
+  // Indicates whether this framework should be receiving offers
+  // when it is connected.
+  bool active_;
+
+  // NOTE: `state` should never modified by means other than `setState()`.
+  //
+  // TODO(asekretenko): Encapsulate `state` to ensure that `metrics.subscribed`
+  // is updated together with any `state` change.
+  State state;
+
+  void setState(State state_);
+
   // Frameworks can either be connected via HTTP or by message passing
   // (scheduler driver). At most one of `http` and `pid` will be set
   // according to the last connection made by the framework; neither
@@ -2701,23 +2715,6 @@ inline const FrameworkID Framework::id() const
 }
 
 
-inline bool Framework::active() const
-{
-  return state == ACTIVE;
-}
-
-
-inline bool Framework::connected() const
-{
-  return state == ACTIVE || state == INACTIVE;
-}
-
-
-inline bool Framework::recovered() const
-{
-  return state == RECOVERED;
-}
-
 
 inline std::ostream& operator<<(
     std::ostream& stream,
diff --git a/src/master/quota_handler.cpp b/src/master/quota_handler.cpp
index ea3f858..e7f3881 100644
--- a/src/master/quota_handler.cpp
+++ b/src/master/quota_handler.cpp
@@ -247,7 +247,7 @@ void Master::QuotaHandler::rescindOffers(const QuotaInfo& 
request) const
   if (master->roles.contains(role)) {
     Role* roleState = master->roles.at(role);
     foreachvalue (const Framework* framework, roleState->frameworks) {
-      if (framework->active()) {
+      if (framework->connected() && framework->active()) {
         ++frameworksInRole;
       }
     }

Reply via email to