Passed the message directly to the Master::registerSlave handler. Some fields in `RegisterSlaveMessage` will become optiona. This patch prepares for that. Also, by passing a message directly to the handler, it allows us to eliminate some copying by using rvalue references.
Review: https://reviews.apache.org/r/64487 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3157d7e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3157d7e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3157d7e Branch: refs/heads/master Commit: c3157d7ea328405aa1c9c05778b8ffc01884ac38 Parents: c9861e1 Author: Jie Yu <[email protected]> Authored: Sun Dec 10 08:59:35 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Mon Dec 11 14:02:00 2017 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 91 ++++++++++++++++++------------------------ src/master/master.hpp | 18 ++------- src/master/validation.cpp | 13 +++--- src/master/validation.hpp | 8 +--- 4 files changed, 49 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 55e9195..120cb75 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -847,12 +847,7 @@ void Master::initialize() &FrameworkToExecutorMessage::data); install<RegisterSlaveMessage>( - &Master::registerSlave, - &RegisterSlaveMessage::slave, - &RegisterSlaveMessage::checkpointed_resources, - &RegisterSlaveMessage::version, - &RegisterSlaveMessage::agent_capabilities, - &RegisterSlaveMessage::resource_version_uuids); + &Master::registerSlave); install<ReregisterSlaveMessage>( &Master::reregisterSlave); @@ -6034,11 +6029,7 @@ void Master::message( void Master::registerSlave( const UPID& from, - const SlaveInfo& slaveInfo, - const vector<Resource>& checkpointedResources, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions) + RegisterSlaveMessage&& registerSlaveMessage) { ++metrics->messages_register_slave; @@ -6050,11 +6041,7 @@ void Master::registerSlave( .onReady(defer(self(), &Self::registerSlave, from, - slaveInfo, - checkpointedResources, - version, - agentCapabilities, - resourceVersions)); + std::move(registerSlaveMessage))); return; } @@ -6070,8 +6057,8 @@ void Master::registerSlave( return; } - Option<Error> error = validation::master::message::registerSlave( - slaveInfo, checkpointedResources); + Option<Error> error = + validation::master::message::registerSlave(registerSlaveMessage); if (error.isSome()) { LOG(WARNING) << "Dropping registration of agent at " << from @@ -6082,13 +6069,13 @@ void Master::registerSlave( if (slaves.registering.contains(from)) { LOG(INFO) << "Ignoring register agent message from " << from - << " (" << slaveInfo.hostname() << ") as registration" - << " is already in progress"; + << " (" << registerSlaveMessage.slave().hostname() + << ") as registration is already in progress"; return; } - LOG(INFO) << "Received register agent message from " - << from << " (" << slaveInfo.hostname() << ")"; + LOG(INFO) << "Received register agent message from " << from + << " (" << registerSlaveMessage.slave().hostname() << ")"; slaves.registering.insert(from); @@ -6098,13 +6085,13 @@ void Master::registerSlave( // master (e.g. when writing to the registry). // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks` // here for consistency. - SlaveInfo _slaveInfo(slaveInfo); convertResourceFormat( - _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT); + registerSlaveMessage.mutable_slave()->mutable_resources(), + POST_RESERVATION_REFINEMENT); - std::vector<Resource> _checkpointedResources(checkpointedResources); convertResourceFormat( - &_checkpointedResources, POST_RESERVATION_REFINEMENT); + registerSlaveMessage.mutable_checkpointed_resources(), + POST_RESERVATION_REFINEMENT); // Note that the principal may be empty if authentication is not // required. Also it is passed along because it may be removed from @@ -6114,30 +6101,24 @@ void Master::registerSlave( authorizeSlave(principal) .onAny(defer(self(), &Self::_registerSlave, - _slaveInfo, from, + std::move(registerSlaveMessage), principal, - _checkpointedResources, - version, - agentCapabilities, - resourceVersions, lambda::_1)); } void Master::_registerSlave( - const SlaveInfo& slaveInfo, const UPID& pid, + RegisterSlaveMessage&& registerSlaveMessage, const Option<string>& principal, - const vector<Resource>& checkpointedResources, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions, const Future<bool>& authorized) { CHECK(!authorized.isDiscarded()); CHECK(slaves.registering.contains(pid)); + const SlaveInfo& slaveInfo = registerSlaveMessage.slave(); + Option<string> authorizationError = None(); if (authorized.isFailed()) { @@ -6189,6 +6170,7 @@ void Master::_registerSlave( // Ignore registration attempts by agents running old Mesos versions. // We expect that the agent's version is in SemVer format; if the // version cannot be parsed, the registration attempt is ignored. + const string& version = registerSlaveMessage.version(); Try<Version> parsedVersion = Version::parse(version); if (parsedVersion.isError()) { @@ -6253,38 +6235,36 @@ void Master::_registerSlave( } // Create and add the slave id. - SlaveInfo slaveInfo_ = slaveInfo; - slaveInfo_.mutable_id()->CopyFrom(newSlaveId()); + SlaveID slaveId = newSlaveId(); LOG(INFO) << "Registering agent at " << pid << " (" - << slaveInfo.hostname() << ") with id " << slaveInfo_.id(); + << slaveInfo.hostname() << ") with id " << slaveId; + + SlaveInfo slaveInfo_ = slaveInfo; + slaveInfo_.mutable_id()->CopyFrom(slaveId); + + registerSlaveMessage.mutable_slave()->mutable_id()->CopyFrom(slaveId); registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_))) .onAny(defer(self(), &Self::__registerSlave, - slaveInfo_, pid, - checkpointedResources, - version, - agentCapabilities, - resourceVersions, + std::move(registerSlaveMessage), lambda::_1)); } void Master::__registerSlave( - const SlaveInfo& slaveInfo, const UPID& pid, - const vector<Resource>& checkpointedResources, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions, + RegisterSlaveMessage&& registerSlaveMessage, const Future<bool>& admit) { CHECK(slaves.registering.contains(pid)); CHECK(!admit.isDiscarded()); + const SlaveInfo& slaveInfo = registerSlaveMessage.slave(); + if (admit.isFailed()) { LOG(FATAL) << "Failed to admit agent " << slaveInfo.id() << " at " << pid << " (" << slaveInfo.hostname() << "): " << admit.failure(); @@ -6312,17 +6292,22 @@ void Master::__registerSlave( machineId.set_hostname(slaveInfo.hostname()); machineId.set_ip(stringify(pid.address.ip)); + vector<SlaveInfo::Capability> agentCapabilities = google::protobuf::convert( + std::move(*registerSlaveMessage.mutable_agent_capabilities())); + vector<Resource> checkpointedResources = google::protobuf::convert( + std::move(*registerSlaveMessage.mutable_checkpointed_resources())); + Slave* slave = new Slave( this, slaveInfo, pid, machineId, - version, - agentCapabilities, + registerSlaveMessage.version(), + std::move(agentCapabilities), Clock::now(), - checkpointedResources, + std::move(checkpointedResources), protobuf::parseResourceVersions( - {resourceVersions.begin(), resourceVersions.end()})); + registerSlaveMessage.resource_version_uuids())); ++metrics->slave_registrations; http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 5c26f20..7411e0b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -466,11 +466,7 @@ public: void registerSlave( const process::UPID& from, - const SlaveInfo& slaveInfo, - const std::vector<Resource>& checkpointedResources, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions); + RegisterSlaveMessage&& registerSlaveMessage); void reregisterSlave( const process::UPID& from, @@ -588,22 +584,14 @@ protected: void recoveredSlavesTimeout(const Registry& registry); void _registerSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, + RegisterSlaveMessage&& registerSlaveMessage, const Option<std::string>& principal, - const std::vector<Resource>& checkpointedResources, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions, const process::Future<bool>& authorized); void __registerSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, - const std::vector<Resource>& checkpointedResources, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions, + RegisterSlaveMessage&& registerSlaveMessage, const process::Future<bool>& admit); void _reregisterSlave( http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 585d8bf..a9b0805 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -264,23 +264,23 @@ static Option<Error> validateSlaveInfo(const SlaveInfo& slaveInfo) } -Option<Error> registerSlave( - const SlaveInfo& slaveInfo, - const vector<Resource>& checkpointedResources) +Option<Error> registerSlave(const RegisterSlaveMessage& message) { + const SlaveInfo& slaveInfo = message.slave(); + Option<Error> error = validateSlaveInfo(slaveInfo); if (error.isSome()) { return error.get(); } - if (!checkpointedResources.empty()) { + if (!message.checkpointed_resources().empty()) { if (!slaveInfo.has_checkpoint() || !slaveInfo.checkpoint()) { return Error( "Checkpointed resources provided when checkpointing is not enabled"); } } - foreach (const Resource& resource, checkpointedResources) { + foreach (const Resource& resource, message.checkpointed_resources()) { error = Resources::validate(resource); if (error.isSome()) { return error.get(); @@ -291,8 +291,7 @@ Option<Error> registerSlave( } -Option<Error> reregisterSlave( - const ReregisterSlaveMessage& message) +Option<Error> reregisterSlave(const ReregisterSlaveMessage& message) { hashset<FrameworkID> frameworkIDs; hashset<pair<FrameworkID, ExecutorID>> executorIDs; http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index 30db3bf..7c129ce 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -66,12 +66,8 @@ namespace message { // guarantees at the libprocess level that would prevent arbitrary UPID // impersonation (MESOS-7424). -Option<Error> registerSlave( - const SlaveInfo& slaveInfo, - const std::vector<Resource>& checkpointedResources); - -Option<Error> reregisterSlave( - const ReregisterSlaveMessage& message); +Option<Error> registerSlave(const RegisterSlaveMessage& message); +Option<Error> reregisterSlave(const ReregisterSlaveMessage& message); } // namespace message { } // namespace master {
