This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit cb8106fb7fc9282232a98b27bd1aca43c52a7f69 Author: Andrei Sekretenko <[email protected]> AuthorDate: Mon Jan 20 19:39:19 2020 +0100 Made `http`, `pid` and `heartbeater` of `Framework` private. This is a prerequisite to localizing mutations of connection-related state of `Framework` to a limited set of `Framework` methods in the next patch. Review: https://reviews.apache.org/r/72094 --- src/master/framework.cpp | 34 +++++++------- src/master/http.cpp | 4 +- src/master/master.cpp | 99 +++++++++++++++++++++-------------------- src/master/master.hpp | 41 ++++++++++------- src/master/readonly_handler.cpp | 4 +- 5 files changed, 95 insertions(+), 87 deletions(-) diff --git a/src/master/framework.cpp b/src/master/framework.cpp index e69a7c2..a9318a9 100644 --- a/src/master/framework.cpp +++ b/src/master/framework.cpp @@ -27,11 +27,11 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info, - const process::UPID& _pid, + const process::UPID& pid, const process::Time& time) : Framework(master, masterFlags, info, ACTIVE, time) { - pid = _pid; + pid_ = pid; } @@ -39,11 +39,11 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info, - const StreamingHttpConnection<v1::scheduler::Event>& _http, + const StreamingHttpConnection<v1::scheduler::Event>& http, const process::Time& time) : Framework(master, masterFlags, info, ACTIVE, time) { - http = _http; + http_ = http; } @@ -89,7 +89,7 @@ Framework::Framework( Framework::~Framework() { - if (http.isSome()) { + if (http_.isSome()) { closeHttpConnection(); } } @@ -563,23 +563,23 @@ void Framework::updateConnection(const process::UPID& newPid) { // Cleanup the HTTP connnection if this is a downgrade from HTTP // to PID. Note that the connection may already be closed. - if (http.isSome()) { + if (http_.isSome()) { closeHttpConnection(); } // TODO(benh): unlink(oldPid); - pid = newPid; + pid_ = newPid; } void Framework::updateConnection( const StreamingHttpConnection<v1::scheduler::Event>& newHttp) { - if (pid.isSome()) { + if (pid_.isSome()) { // Wipe the PID if this is an upgrade from PID to HTTP. // TODO(benh): unlink(oldPid); - pid = None(); - } else if (http.isSome()) { + pid_ = None(); + } else if (http_.isSome()) { // Cleanup the old HTTP connection. // Note that master creates a new HTTP connection for every // subscribe request, so 'newHttp' should always be different @@ -587,28 +587,28 @@ void Framework::updateConnection( closeHttpConnection(); } - CHECK_NONE(http); + CHECK_NONE(http_); - http = newHttp; + http_ = newHttp; } void Framework::closeHttpConnection() { - CHECK_SOME(http); + CHECK_SOME(http_); - if (connected() && !http->close()) { + if (connected() && !http_->close()) { LOG(WARNING) << "Failed to close HTTP pipe for " << *this; } - http = None(); + http_ = None(); heartbeater.reset(); } void Framework::heartbeat() { - CHECK_SOME(http); + CHECK_SOME(http_); // TODO(vinod): Make heartbeat interval configurable and include // this information in the SUBSCRIBED response. @@ -619,7 +619,7 @@ void Framework::heartbeat() new ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>( "framework " + stringify(info.id()), event, - http.get(), + http_.get(), DEFAULT_HEARTBEAT_INTERVAL, None(), [this, event]() { diff --git a/src/master/http.cpp b/src/master/http.cpp index eeaac88..81ab26a 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -634,7 +634,7 @@ Future<Response> Master::Http::scheduler( return Forbidden("Framework is not subscribed"); } - if (framework->http.isNone()) { + if (framework->http().isNone()) { return Forbidden("Framework is not connected via HTTP"); } @@ -645,7 +645,7 @@ Future<Response> Master::Http::scheduler( } const string& streamId = request.headers.at("Mesos-Stream-Id"); - if (streamId != framework->http->streamId.toString()) { + if (streamId != framework->http()->streamId.toString()) { return BadRequest( "The stream ID '" + streamId + "' included in this request " "didn't match the stream ID currently associated with framework ID " diff --git a/src/master/master.cpp b/src/master/master.cpp index 84963b4..6d45c4e 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1262,7 +1262,8 @@ void Master::exited( const StreamingHttpConnection<v1::scheduler::Event>& http) { foreachvalue (Framework* framework, frameworks.registered) { - if (framework->http.isSome() && framework->http->writer == http.writer) { + if (framework->http().isSome() && + framework->http()->writer == http.writer) { CHECK_EQ(frameworkId, framework->id()); _exited(framework); return; @@ -1283,7 +1284,7 @@ void Master::exited( void Master::exited(const UPID& pid) { foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == pid) { + if (framework->pid() == pid) { // See comments in `receive()` on why we send an error message // to the framework upon detecting a disconnection. FrameworkErrorMessage message; @@ -2307,7 +2308,7 @@ void Master::drop( // of validation, it's possible that this function will be called before the // master validates that operations from v0 frameworks should not have their // ID set. - if (operation.has_id() && framework->http.isSome()) { + if (operation.has_id() && framework->http().isSome()) { scheduler::Event update; update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS); @@ -2396,7 +2397,7 @@ void Master::receive( return; } - if (framework->pid != from) { + if (framework->pid() != from) { drop(from, call, "Call is not from registered framework"); return; } @@ -2825,7 +2826,7 @@ void Master::sendFrameworkUpdates(const Framework& framework) // TODO(anand): We set 'pid' to UPID() for http frameworks // as 'pid' was made optional in 0.24.0. In 0.25.0, we // no longer have to set pid here for http frameworks. - message.set_pid(framework.pid.getOrElse(UPID())); + message.set_pid(framework.pid().getOrElse(UPID())); message.mutable_framework_info()->CopyFrom(framework.info); send(slave->pid, message); } @@ -2979,7 +2980,7 @@ void Master::_subscribe( // If we are here the framework is subscribing for the first time. // Check if this framework is already subscribed (because it retries). foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == from) { + if (framework->pid() == from) { LOG(INFO) << "Framework " << *framework << " already subscribed, resending acknowledgement"; @@ -3021,7 +3022,7 @@ void Master::_subscribe( // response because that would go to the framework that is already connected. if (frameworks.principals.contains(from)) { foreachvalue (Framework* framework, frameworks.registered) { - if (framework->pid == from && framework->id() != frameworkInfo.id()) { + if (framework->pid() == from && framework->id() != frameworkInfo.id()) { LOG(ERROR) << "Dropping SUBSCRIBE call for framework '" << frameworkInfo.name() << "': " << *framework << " already connected at " << from; @@ -3073,7 +3074,7 @@ void Master::_subscribe( // another instance of their scheduler has reconnected. // Test for the error case first. - if ((framework->pid != from) && !force) { + if ((framework->pid() != from) && !force) { LOG(ERROR) << "Disallowing subscription attempt of" << " framework " << *framework << " because it is not expected from " << from; @@ -3127,7 +3128,7 @@ void Master::_subscribe( // Relink to the framework. This might be necessary if the // framework link previously broke. - link(framework->pid.get()); + link(framework->pid().get()); // Reactivate the framework. // NOTE: We do this after recovering resources (above) so that @@ -3249,7 +3250,7 @@ void Master::unregisterFramework( Framework* framework = getFramework(frameworkId); if (framework != nullptr) { - if (framework->pid == from) { + if (framework->pid() == from) { teardown(framework); } else { LOG(WARNING) @@ -3276,7 +3277,7 @@ void Master::deactivateFramework( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring deactivate framework message for framework " << *framework << " because it is not expected from " << from; @@ -3311,10 +3312,10 @@ void Master::disconnect(Framework* framework) framework->setFrameworkState(Framework::State::DISCONNECTED); - if (framework->pid.isSome()) { + if (framework->pid().isSome()) { // Remove the framework from authenticated. This is safe because // a framework will always reauthenticate before (re-)registering. - authenticated.erase(framework->pid.get()); + authenticated.erase(framework->pid().get()); } else { framework->closeHttpConnection(); } @@ -3432,7 +3433,7 @@ void Master::resourceRequest( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring resource request message from framework " << *framework << " because it is not expected from " << from; @@ -3647,7 +3648,7 @@ void Master::launchTasks( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring launch tasks message for offers " << stringify(launchTasksMessage.offer_ids()) @@ -4116,7 +4117,7 @@ void Master::accept( case Offer::Operation::SHRINK_VOLUME: case Offer::Operation::CREATE_DISK: case Offer::Operation::DESTROY_DISK: { - if (framework->http.isNone()) { + if (framework->http().isNone()) { const string message = "The 'id' field was set in an offer operation, but operation" " feedback is not supported for the SchedulerDriver API"; @@ -5230,7 +5231,7 @@ void Master::_accept( // TODO(anand): We set 'pid' to UPID() for http frameworks // as 'pid' was made optional in 0.24.0. In 0.25.0, we // no longer have to set pid here for http frameworks. - message.set_pid(framework->pid.getOrElse(UPID())); + message.set_pid(framework->pid().getOrElse(UPID())); message.mutable_task()->MergeFrom(task); message.set_launch_executor(launchExecutor); @@ -5928,7 +5929,7 @@ void Master::reviveOffers( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring revive offers message for framework " << *framework << " because it is not expected from " << from; @@ -6000,7 +6001,7 @@ void Master::killTask( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring kill task message for task " << taskId << " of framework " << *framework << " because it is not expected from " << from; @@ -6168,7 +6169,7 @@ void Master::statusUpdateAcknowledgement( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring status update acknowledgement for status " << uuid_.get() << " of task " << taskId << " of framework " @@ -6434,7 +6435,7 @@ void Master::schedulerMessage( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring framework message for executor '" << executorId << "' of framework " << *framework @@ -7797,7 +7798,7 @@ void Master::updateSlaveFrameworks( // TODO(anand): We set 'pid' to UPID() for http frameworks // as 'pid' was made optional in 0.24.0. In 0.25.0, we // no longer have to set pid here for http frameworks. - message.set_pid(framework->pid.getOrElse(UPID())); + message.set_pid(framework->pid().getOrElse(UPID())); send(slave->pid, message); } else { @@ -9225,7 +9226,7 @@ void Master::sendBulkOperationFeedback( Option<Framework*> framework = frameworks.registered.get(operation->framework_id()); - if (!framework.isSome() || !framework.get()->http.isSome()) { + if (!framework.isSome() || !framework.get()->http().isSome()) { continue; } @@ -9268,7 +9269,7 @@ void Master::reconcileTasks( return; } - if (framework->pid != from) { + if (framework->pid() != from) { LOG(WARNING) << "Ignoring reconcile tasks message for framework " << *framework << " because it is not expected from " << from; @@ -10388,13 +10389,13 @@ void Master::addFramework( frameworks.registered[framework->id()] = framework; if (framework->connected()) { - if (framework->pid.isSome()) { - link(framework->pid.get()); + if (framework->pid().isSome()) { + link(framework->pid().get()); } else { - CHECK_SOME(framework->http); + CHECK_SOME(framework->http()); const StreamingHttpConnection<v1::scheduler::Event>& http = - framework->http.get(); + framework->http().get(); http.closed() .onAny(defer(self(), &Self::exited, framework->id(), http)); @@ -10417,9 +10418,9 @@ void Master::addFramework( ? Option<string>(framework->info.principal()) : None(); - if (framework->pid.isSome()) { - CHECK(!frameworks.principals.contains(framework->pid.get())); - frameworks.principals.put(framework->pid.get(), principal); + if (framework->pid().isSome()) { + CHECK(!frameworks.principals.contains(framework->pid().get())); + frameworks.principals.put(framework->pid().get(), principal); } if (principal.isSome()) { @@ -10555,8 +10556,8 @@ void Master::activateRecoveredFramework( CHECK(framework->recovered()); CHECK(framework->offers.empty()); CHECK(framework->inverseOffers.empty()); - CHECK(framework->pid.isNone()); - CHECK(framework->http.isNone()); + CHECK(framework->pid().isNone()); + CHECK(framework->http().isNone()); updateFramework(framework, frameworkInfo, suppressedRoles); @@ -10587,9 +10588,9 @@ void Master::activateRecoveredFramework( ? Option<string>(framework->info.principal()) : None(); - if (framework->pid.isSome()) { - CHECK(!frameworks.principals.contains(framework->pid.get())); - frameworks.principals.put(framework->pid.get(), principal); + if (framework->pid().isSome()) { + CHECK(!frameworks.principals.contains(framework->pid().get())); + frameworks.principals.put(framework->pid().get(), principal); } // We expect the framework metrics for this principal to be created @@ -10638,13 +10639,13 @@ void Master::failoverFramework( } // If this is an upgrade, clear the authentication related data. - if (framework->pid.isSome()) { - authenticated.erase(framework->pid.get()); + if (framework->pid().isSome()) { + authenticated.erase(framework->pid().get()); - CHECK(frameworks.principals.contains(framework->pid.get())); - Option<string> principal = frameworks.principals[framework->pid.get()]; + CHECK(frameworks.principals.contains(framework->pid().get())); + Option<string> principal = frameworks.principals[framework->pid().get()]; - frameworks.principals.erase(framework->pid.get()); + frameworks.principals.erase(framework->pid().get()); } framework->updateConnection(http); @@ -10665,7 +10666,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) { CHECK_NOTNULL(framework); - const Option<UPID> oldPid = framework->pid; + const Option<UPID> oldPid = framework->pid(); // There are a few failover cases to consider: // 1. The pid has changed or it was previously a HTTP based scheduler. @@ -10688,7 +10689,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) _failoverFramework(framework); - CHECK_SOME(framework->pid); + CHECK_SOME(framework->pid()); // Update the principal mapping for this framework, which is // needed to keep the per-principal framework metrics accurate. @@ -10901,7 +10902,7 @@ void Master::removeFramework(Framework* framework) // TODO(benh): unlink(framework->pid); - if (framework->http.isSome()) { + if (framework->http().isSome()) { framework->closeHttpConnection(); } @@ -10913,13 +10914,13 @@ void Master::removeFramework(Framework* framework) // TODO(anand): This only works for pid based frameworks. We would // need similar authentication logic for http frameworks. - if (framework->pid.isSome()) { - authenticated.erase(framework->pid.get()); + if (framework->pid().isSome()) { + authenticated.erase(framework->pid().get()); - CHECK(frameworks.principals.contains(framework->pid.get())); - Option<string> principal = frameworks.principals[framework->pid.get()]; + CHECK(frameworks.principals.contains(framework->pid().get())); + Option<string> principal = frameworks.principals[framework->pid().get()]; - frameworks.principals.erase(framework->pid.get()); + frameworks.principals.erase(framework->pid().get()); // Remove the metrics for the principal if this framework is the // last one with this principal. diff --git a/src/master/master.hpp b/src/master/master.hpp index c813e9f..f1aa40f 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -2548,6 +2548,13 @@ struct Framework void setFrameworkState(const State& _state); + const Option<StreamingHttpConnection<v1::scheduler::Event>>& http() const + { + return http_; + } + + const Option<process::UPID>& pid() const { return pid_; } + Master* const master; FrameworkInfo info; @@ -2556,13 +2563,6 @@ struct Framework protobuf::framework::Capabilities capabilities; - // Frameworks can either be connected via HTTP or by message passing - // (scheduler driver). At most one of `http` and `pid` will be set - // according to the last connection made by the framework; neither - // field will be set if the framework is in state `RECOVERED`. - Option<StreamingHttpConnection<v1::scheduler::Event>> http; - Option<process::UPID> pid; - State state; process::Time registeredTime; @@ -2640,10 +2640,6 @@ struct Framework Resources totalOfferedResources; hashmap<SlaveID, Resources> offeredResources; - // This is only set for HTTP frameworks. - process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>> - heartbeater; - // This is used for per-framework metrics. FrameworkMetrics metrics; @@ -2656,6 +2652,17 @@ private: Framework(const Framework&); // No copying. Framework& operator=(const Framework&); // No assigning. + + // Frameworks can either be connected via HTTP or by message passing + // (scheduler driver). At most one of `http` and `pid` will be set + // according to the last connection made by the framework; neither + // field will be set if the framework is in state `RECOVERED`. + Option<StreamingHttpConnection<v1::scheduler::Event>> http_; + Option<process::UPID> pid_; + + // This is only set for HTTP frameworks. + process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>> + heartbeater; }; @@ -2682,13 +2689,13 @@ void Framework::send(const Message& message) // that one of `http` or `pid` is set if the framework is connected. } - if (http.isSome()) { - if (!http->send(message)) { + if (http_.isSome()) { + if (!http_->send(message)) { LOG(WARNING) << "Unable to send message to framework " << *this << ":" << " connection closed"; } - } else if (pid.isSome()) { - master->send(pid.get(), message); + } else if (pid().isSome()) { + master->send(pid().get(), message); } else { LOG(WARNING) << "Unable to send message to framework " << *this << ":" << " framework is recovered but has not reregistered"; @@ -2730,8 +2737,8 @@ inline std::ostream& operator<<( // updated on framework failover (MESOS-1784). stream << framework.id() << " (" << framework.info.name() << ")"; - if (framework.pid.isSome()) { - stream << " at " << framework.pid.get(); + if (framework.pid().isSome()) { + stream << " at " << framework.pid().get(); } return stream; diff --git a/src/master/readonly_handler.cpp b/src/master/readonly_handler.cpp index 40005a2..341a75a 100644 --- a/src/master/readonly_handler.cpp +++ b/src/master/readonly_handler.cpp @@ -483,8 +483,8 @@ void json(JSON::ObjectWriter* writer, const Summary<Framework>& summary) writer->field("name", framework.info.name()); // Omit pid for http frameworks. - if (framework.pid.isSome()) { - writer->field("pid", string(framework.pid.get())); + if (framework.pid().isSome()) { + writer->field("pid", string(framework.pid().get())); } // TODO(bmahler): Use these in the webui.
