This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit cb8106fb7fc9282232a98b27bd1aca43c52a7f69
Author: Andrei Sekretenko <[email protected]>
AuthorDate: Mon Jan 20 19:39:19 2020 +0100

    Made `http`, `pid` and `heartbeater` of `Framework` private.
    
    This is a prerequisite to localizing mutations of connection-related
    state of `Framework` to a limited set of `Framework` methods
    in the next patch.
    
    Review: https://reviews.apache.org/r/72094
---
 src/master/framework.cpp        | 34 +++++++-------
 src/master/http.cpp             |  4 +-
 src/master/master.cpp           | 99 +++++++++++++++++++++--------------------
 src/master/master.hpp           | 41 ++++++++++-------
 src/master/readonly_handler.cpp |  4 +-
 5 files changed, 95 insertions(+), 87 deletions(-)

diff --git a/src/master/framework.cpp b/src/master/framework.cpp
index e69a7c2..a9318a9 100644
--- a/src/master/framework.cpp
+++ b/src/master/framework.cpp
@@ -27,11 +27,11 @@ Framework::Framework(
     Master* const master,
     const Flags& masterFlags,
     const FrameworkInfo& info,
-    const process::UPID& _pid,
+    const process::UPID& pid,
     const process::Time& time)
   : Framework(master, masterFlags, info, ACTIVE, time)
 {
-  pid = _pid;
+  pid_ = pid;
 }
 
 
@@ -39,11 +39,11 @@ Framework::Framework(
     Master* const master,
     const Flags& masterFlags,
     const FrameworkInfo& info,
-    const StreamingHttpConnection<v1::scheduler::Event>& _http,
+    const StreamingHttpConnection<v1::scheduler::Event>& http,
     const process::Time& time)
   : Framework(master, masterFlags, info, ACTIVE, time)
 {
-  http = _http;
+  http_ = http;
 }
 
 
@@ -89,7 +89,7 @@ Framework::Framework(
 
 Framework::~Framework()
 {
-  if (http.isSome()) {
+  if (http_.isSome()) {
     closeHttpConnection();
   }
 }
@@ -563,23 +563,23 @@ void Framework::updateConnection(const process::UPID& 
newPid)
 {
   // Cleanup the HTTP connnection if this is a downgrade from HTTP
   // to PID. Note that the connection may already be closed.
-  if (http.isSome()) {
+  if (http_.isSome()) {
     closeHttpConnection();
   }
 
   // TODO(benh): unlink(oldPid);
-  pid = newPid;
+  pid_ = newPid;
 }
 
 
 void Framework::updateConnection(
     const StreamingHttpConnection<v1::scheduler::Event>& newHttp)
 {
-  if (pid.isSome()) {
+  if (pid_.isSome()) {
     // Wipe the PID if this is an upgrade from PID to HTTP.
     // TODO(benh): unlink(oldPid);
-    pid = None();
-  } else if (http.isSome()) {
+    pid_ = None();
+  } else if (http_.isSome()) {
     // Cleanup the old HTTP connection.
     // Note that master creates a new HTTP connection for every
     // subscribe request, so 'newHttp' should always be different
@@ -587,28 +587,28 @@ void Framework::updateConnection(
     closeHttpConnection();
   }
 
-  CHECK_NONE(http);
+  CHECK_NONE(http_);
 
-  http = newHttp;
+  http_ = newHttp;
 }
 
 
 void Framework::closeHttpConnection()
 {
-  CHECK_SOME(http);
+  CHECK_SOME(http_);
 
-  if (connected() && !http->close()) {
+  if (connected() && !http_->close()) {
     LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
   }
 
-  http = None();
+  http_ = None();
   heartbeater.reset();
 }
 
 
 void Framework::heartbeat()
 {
-  CHECK_SOME(http);
+  CHECK_SOME(http_);
 
   // TODO(vinod): Make heartbeat interval configurable and include
   // this information in the SUBSCRIBED response.
@@ -619,7 +619,7 @@ void Framework::heartbeat()
       new ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>(
           "framework " + stringify(info.id()),
           event,
-          http.get(),
+          http_.get(),
           DEFAULT_HEARTBEAT_INTERVAL,
           None(),
           [this, event]() {
diff --git a/src/master/http.cpp b/src/master/http.cpp
index eeaac88..81ab26a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -634,7 +634,7 @@ Future<Response> Master::Http::scheduler(
     return Forbidden("Framework is not subscribed");
   }
 
-  if (framework->http.isNone()) {
+  if (framework->http().isNone()) {
     return Forbidden("Framework is not connected via HTTP");
   }
 
@@ -645,7 +645,7 @@ Future<Response> Master::Http::scheduler(
   }
 
   const string& streamId = request.headers.at("Mesos-Stream-Id");
-  if (streamId != framework->http->streamId.toString()) {
+  if (streamId != framework->http()->streamId.toString()) {
     return BadRequest(
         "The stream ID '" + streamId + "' included in this request "
         "didn't match the stream ID currently associated with framework ID "
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 84963b4..6d45c4e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1262,7 +1262,8 @@ void Master::exited(
     const StreamingHttpConnection<v1::scheduler::Event>& http)
 {
   foreachvalue (Framework* framework, frameworks.registered) {
-    if (framework->http.isSome() && framework->http->writer == http.writer) {
+    if (framework->http().isSome() &&
+        framework->http()->writer == http.writer) {
       CHECK_EQ(frameworkId, framework->id());
       _exited(framework);
       return;
@@ -1283,7 +1284,7 @@ void Master::exited(
 void Master::exited(const UPID& pid)
 {
   foreachvalue (Framework* framework, frameworks.registered) {
-    if (framework->pid == pid) {
+    if (framework->pid() == pid) {
       // See comments in `receive()` on why we send an error message
       // to the framework upon detecting a disconnection.
       FrameworkErrorMessage message;
@@ -2307,7 +2308,7 @@ void Master::drop(
   // of validation, it's possible that this function will be called before the
   // master validates that operations from v0 frameworks should not have their
   // ID set.
-  if (operation.has_id() && framework->http.isSome()) {
+  if (operation.has_id() && framework->http().isSome()) {
     scheduler::Event update;
     update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
 
@@ -2396,7 +2397,7 @@ void Master::receive(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     drop(from, call, "Call is not from registered framework");
     return;
   }
@@ -2825,7 +2826,7 @@ void Master::sendFrameworkUpdates(const Framework& 
framework)
     // TODO(anand): We set 'pid' to UPID() for http frameworks
     // as 'pid' was made optional in 0.24.0. In 0.25.0, we
     // no longer have to set pid here for http frameworks.
-    message.set_pid(framework.pid.getOrElse(UPID()));
+    message.set_pid(framework.pid().getOrElse(UPID()));
     message.mutable_framework_info()->CopyFrom(framework.info);
     send(slave->pid, message);
   }
@@ -2979,7 +2980,7 @@ void Master::_subscribe(
     // If we are here the framework is subscribing for the first time.
     // Check if this framework is already subscribed (because it retries).
     foreachvalue (Framework* framework, frameworks.registered) {
-      if (framework->pid == from) {
+      if (framework->pid() == from) {
         LOG(INFO) << "Framework " << *framework
                   << " already subscribed, resending acknowledgement";
 
@@ -3021,7 +3022,7 @@ void Master::_subscribe(
   // response because that would go to the framework that is already connected.
   if (frameworks.principals.contains(from)) {
     foreachvalue (Framework* framework, frameworks.registered) {
-      if (framework->pid == from && framework->id() != frameworkInfo.id()) {
+      if (framework->pid() == from && framework->id() != frameworkInfo.id()) {
         LOG(ERROR) << "Dropping SUBSCRIBE call for framework '"
                    << frameworkInfo.name() << "': " << *framework
                    << " already connected at " << from;
@@ -3073,7 +3074,7 @@ void Master::_subscribe(
     // another instance of their scheduler has reconnected.
 
     // Test for the error case first.
-    if ((framework->pid != from) && !force) {
+    if ((framework->pid() != from) && !force) {
       LOG(ERROR) << "Disallowing subscription attempt of"
                  << " framework " << *framework
                  << " because it is not expected from " << from;
@@ -3127,7 +3128,7 @@ void Master::_subscribe(
 
       // Relink to the framework. This might be necessary if the
       // framework link previously broke.
-      link(framework->pid.get());
+      link(framework->pid().get());
 
       // Reactivate the framework.
       // NOTE: We do this after recovering resources (above) so that
@@ -3249,7 +3250,7 @@ void Master::unregisterFramework(
 
   Framework* framework = getFramework(frameworkId);
   if (framework != nullptr) {
-    if (framework->pid == from) {
+    if (framework->pid() == from) {
       teardown(framework);
     } else {
       LOG(WARNING)
@@ -3276,7 +3277,7 @@ void Master::deactivateFramework(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring deactivate framework message for framework " << *framework
       << " because it is not expected from " << from;
@@ -3311,10 +3312,10 @@ void Master::disconnect(Framework* framework)
 
   framework->setFrameworkState(Framework::State::DISCONNECTED);
 
-  if (framework->pid.isSome()) {
+  if (framework->pid().isSome()) {
     // Remove the framework from authenticated. This is safe because
     // a framework will always reauthenticate before (re-)registering.
-    authenticated.erase(framework->pid.get());
+    authenticated.erase(framework->pid().get());
   } else {
     framework->closeHttpConnection();
   }
@@ -3432,7 +3433,7 @@ void Master::resourceRequest(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring resource request message from framework " << *framework
       << " because it is not expected from " << from;
@@ -3647,7 +3648,7 @@ void Master::launchTasks(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring launch tasks message for offers "
       << stringify(launchTasksMessage.offer_ids())
@@ -4116,7 +4117,7 @@ void Master::accept(
           case Offer::Operation::SHRINK_VOLUME:
           case Offer::Operation::CREATE_DISK:
           case Offer::Operation::DESTROY_DISK: {
-            if (framework->http.isNone()) {
+            if (framework->http().isNone()) {
               const string message =
                 "The 'id' field was set in an offer operation, but operation"
                 " feedback is not supported for the SchedulerDriver API";
@@ -5230,7 +5231,7 @@ void Master::_accept(
             // TODO(anand): We set 'pid' to UPID() for http frameworks
             // as 'pid' was made optional in 0.24.0. In 0.25.0, we
             // no longer have to set pid here for http frameworks.
-            message.set_pid(framework->pid.getOrElse(UPID()));
+            message.set_pid(framework->pid().getOrElse(UPID()));
             message.mutable_task()->MergeFrom(task);
 
             message.set_launch_executor(launchExecutor);
@@ -5928,7 +5929,7 @@ void Master::reviveOffers(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring revive offers message for framework " << *framework
       << " because it is not expected from " << from;
@@ -6000,7 +6001,7 @@ void Master::killTask(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring kill task message for task " << taskId << " of framework "
       << *framework << " because it is not expected from " << from;
@@ -6168,7 +6169,7 @@ void Master::statusUpdateAcknowledgement(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring status update acknowledgement for status "
       << uuid_.get() << " of task " << taskId << " of framework "
@@ -6434,7 +6435,7 @@ void Master::schedulerMessage(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring framework message for executor '" << executorId
       << "' of framework " << *framework
@@ -7797,7 +7798,7 @@ void Master::updateSlaveFrameworks(
       // TODO(anand): We set 'pid' to UPID() for http frameworks
       // as 'pid' was made optional in 0.24.0. In 0.25.0, we
       // no longer have to set pid here for http frameworks.
-      message.set_pid(framework->pid.getOrElse(UPID()));
+      message.set_pid(framework->pid().getOrElse(UPID()));
 
       send(slave->pid, message);
     } else {
@@ -9225,7 +9226,7 @@ void Master::sendBulkOperationFeedback(
     Option<Framework*> framework =
       frameworks.registered.get(operation->framework_id());
 
-    if (!framework.isSome() || !framework.get()->http.isSome()) {
+    if (!framework.isSome() || !framework.get()->http().isSome()) {
       continue;
     }
 
@@ -9268,7 +9269,7 @@ void Master::reconcileTasks(
     return;
   }
 
-  if (framework->pid != from) {
+  if (framework->pid() != from) {
     LOG(WARNING)
       << "Ignoring reconcile tasks message for framework " << *framework
       << " because it is not expected from " << from;
@@ -10388,13 +10389,13 @@ void Master::addFramework(
   frameworks.registered[framework->id()] = framework;
 
   if (framework->connected()) {
-    if (framework->pid.isSome()) {
-      link(framework->pid.get());
+    if (framework->pid().isSome()) {
+      link(framework->pid().get());
     } else {
-      CHECK_SOME(framework->http);
+      CHECK_SOME(framework->http());
 
       const StreamingHttpConnection<v1::scheduler::Event>& http =
-        framework->http.get();
+        framework->http().get();
 
       http.closed()
         .onAny(defer(self(), &Self::exited, framework->id(), http));
@@ -10417,9 +10418,9 @@ void Master::addFramework(
       ? Option<string>(framework->info.principal())
       : None();
 
-  if (framework->pid.isSome()) {
-    CHECK(!frameworks.principals.contains(framework->pid.get()));
-    frameworks.principals.put(framework->pid.get(), principal);
+  if (framework->pid().isSome()) {
+    CHECK(!frameworks.principals.contains(framework->pid().get()));
+    frameworks.principals.put(framework->pid().get(), principal);
   }
 
   if (principal.isSome()) {
@@ -10555,8 +10556,8 @@ void Master::activateRecoveredFramework(
   CHECK(framework->recovered());
   CHECK(framework->offers.empty());
   CHECK(framework->inverseOffers.empty());
-  CHECK(framework->pid.isNone());
-  CHECK(framework->http.isNone());
+  CHECK(framework->pid().isNone());
+  CHECK(framework->http().isNone());
 
   updateFramework(framework, frameworkInfo, suppressedRoles);
 
@@ -10587,9 +10588,9 @@ void Master::activateRecoveredFramework(
     ? Option<string>(framework->info.principal())
     : None();
 
-  if (framework->pid.isSome()) {
-    CHECK(!frameworks.principals.contains(framework->pid.get()));
-    frameworks.principals.put(framework->pid.get(), principal);
+  if (framework->pid().isSome()) {
+    CHECK(!frameworks.principals.contains(framework->pid().get()));
+    frameworks.principals.put(framework->pid().get(), principal);
   }
 
   // We expect the framework metrics for this principal to be created
@@ -10638,13 +10639,13 @@ void Master::failoverFramework(
   }
 
   // If this is an upgrade, clear the authentication related data.
-  if (framework->pid.isSome()) {
-    authenticated.erase(framework->pid.get());
+  if (framework->pid().isSome()) {
+    authenticated.erase(framework->pid().get());
 
-    CHECK(frameworks.principals.contains(framework->pid.get()));
-    Option<string> principal = frameworks.principals[framework->pid.get()];
+    CHECK(frameworks.principals.contains(framework->pid().get()));
+    Option<string> principal = frameworks.principals[framework->pid().get()];
 
-    frameworks.principals.erase(framework->pid.get());
+    frameworks.principals.erase(framework->pid().get());
   }
 
   framework->updateConnection(http);
@@ -10665,7 +10666,7 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
 {
   CHECK_NOTNULL(framework);
 
-  const Option<UPID> oldPid = framework->pid;
+  const Option<UPID> oldPid = framework->pid();
 
   // There are a few failover cases to consider:
   //   1. The pid has changed or it was previously a HTTP based scheduler.
@@ -10688,7 +10689,7 @@ void Master::failoverFramework(Framework* framework, 
const UPID& newPid)
 
   _failoverFramework(framework);
 
-  CHECK_SOME(framework->pid);
+  CHECK_SOME(framework->pid());
 
   // Update the principal mapping for this framework, which is
   // needed to keep the per-principal framework metrics accurate.
@@ -10901,7 +10902,7 @@ void Master::removeFramework(Framework* framework)
 
   // TODO(benh): unlink(framework->pid);
 
-  if (framework->http.isSome()) {
+  if (framework->http().isSome()) {
     framework->closeHttpConnection();
   }
 
@@ -10913,13 +10914,13 @@ void Master::removeFramework(Framework* framework)
 
   // TODO(anand): This only works for pid based frameworks. We would
   // need similar authentication logic for http frameworks.
-  if (framework->pid.isSome()) {
-    authenticated.erase(framework->pid.get());
+  if (framework->pid().isSome()) {
+    authenticated.erase(framework->pid().get());
 
-    CHECK(frameworks.principals.contains(framework->pid.get()));
-    Option<string> principal = frameworks.principals[framework->pid.get()];
+    CHECK(frameworks.principals.contains(framework->pid().get()));
+    Option<string> principal = frameworks.principals[framework->pid().get()];
 
-    frameworks.principals.erase(framework->pid.get());
+    frameworks.principals.erase(framework->pid().get());
 
     // Remove the metrics for the principal if this framework is the
     // last one with this principal.
diff --git a/src/master/master.hpp b/src/master/master.hpp
index c813e9f..f1aa40f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2548,6 +2548,13 @@ struct Framework
 
   void setFrameworkState(const State& _state);
 
+  const Option<StreamingHttpConnection<v1::scheduler::Event>>& http() const
+  {
+    return http_;
+  }
+
+  const Option<process::UPID>& pid() const { return pid_; }
+
   Master* const master;
 
   FrameworkInfo info;
@@ -2556,13 +2563,6 @@ struct Framework
 
   protobuf::framework::Capabilities capabilities;
 
-  // Frameworks can either be connected via HTTP or by message passing
-  // (scheduler driver). At most one of `http` and `pid` will be set
-  // according to the last connection made by the framework; neither
-  // field will be set if the framework is in state `RECOVERED`.
-  Option<StreamingHttpConnection<v1::scheduler::Event>> http;
-  Option<process::UPID> pid;
-
   State state;
 
   process::Time registeredTime;
@@ -2640,10 +2640,6 @@ struct Framework
   Resources totalOfferedResources;
   hashmap<SlaveID, Resources> offeredResources;
 
-  // This is only set for HTTP frameworks.
-  process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>>
-    heartbeater;
-
   // This is used for per-framework metrics.
   FrameworkMetrics metrics;
 
@@ -2656,6 +2652,17 @@ private:
 
   Framework(const Framework&);              // No copying.
   Framework& operator=(const Framework&); // No assigning.
+
+  // Frameworks can either be connected via HTTP or by message passing
+  // (scheduler driver). At most one of `http` and `pid` will be set
+  // according to the last connection made by the framework; neither
+  // field will be set if the framework is in state `RECOVERED`.
+  Option<StreamingHttpConnection<v1::scheduler::Event>> http_;
+  Option<process::UPID> pid_;
+
+  // This is only set for HTTP frameworks.
+  process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>>
+    heartbeater;
 };
 
 
@@ -2682,13 +2689,13 @@ void Framework::send(const Message& message)
     // that one of `http` or `pid` is set if the framework is connected.
   }
 
-  if (http.isSome()) {
-    if (!http->send(message)) {
+  if (http_.isSome()) {
+    if (!http_->send(message)) {
       LOG(WARNING) << "Unable to send message to framework " << *this << ":"
                    << " connection closed";
     }
-  } else if (pid.isSome()) {
-    master->send(pid.get(), message);
+  } else if (pid().isSome()) {
+    master->send(pid().get(), message);
   } else {
     LOG(WARNING) << "Unable to send message to framework " << *this << ":"
                  << " framework is recovered but has not reregistered";
@@ -2730,8 +2737,8 @@ inline std::ostream& operator<<(
   // updated on framework failover (MESOS-1784).
   stream << framework.id() << " (" << framework.info.name() << ")";
 
-  if (framework.pid.isSome()) {
-    stream << " at " << framework.pid.get();
+  if (framework.pid().isSome()) {
+    stream << " at " << framework.pid().get();
   }
 
   return stream;
diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp
index 40005a2..341a75a 100644
--- a/src/master/readonly_handler.cpp
+++ b/src/master/readonly_handler.cpp
@@ -483,8 +483,8 @@ void json(JSON::ObjectWriter* writer, const 
Summary<Framework>& summary)
   writer->field("name", framework.info.name());
 
   // Omit pid for http frameworks.
-  if (framework.pid.isSome()) {
-    writer->field("pid", string(framework.pid.get()));
+  if (framework.pid().isSome()) {
+    writer->field("pid", string(framework.pid().get()));
   }
 
   // TODO(bmahler): Use these in the webui.

Reply via email to