Passed the message directly to the Master::registerSlave handler.

Some fields in `RegisterSlaveMessage` will become optiona. This patch
prepares for that. Also, by passing a message directly to the handler,
it allows us to eliminate some copying by using rvalue references.

Review: https://reviews.apache.org/r/64487


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3157d7e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3157d7e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3157d7e

Branch: refs/heads/master
Commit: c3157d7ea328405aa1c9c05778b8ffc01884ac38
Parents: c9861e1
Author: Jie Yu <[email protected]>
Authored: Sun Dec 10 08:59:35 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Mon Dec 11 14:02:00 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp     | 91 ++++++++++++++++++------------------------
 src/master/master.hpp     | 18 ++-------
 src/master/validation.cpp | 13 +++---
 src/master/validation.hpp |  8 +---
 4 files changed, 49 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 55e9195..120cb75 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -847,12 +847,7 @@ void Master::initialize()
       &FrameworkToExecutorMessage::data);
 
   install<RegisterSlaveMessage>(
-      &Master::registerSlave,
-      &RegisterSlaveMessage::slave,
-      &RegisterSlaveMessage::checkpointed_resources,
-      &RegisterSlaveMessage::version,
-      &RegisterSlaveMessage::agent_capabilities,
-      &RegisterSlaveMessage::resource_version_uuids);
+      &Master::registerSlave);
 
   install<ReregisterSlaveMessage>(
       &Master::reregisterSlave);
@@ -6034,11 +6029,7 @@ void Master::message(
 
 void Master::registerSlave(
     const UPID& from,
-    const SlaveInfo& slaveInfo,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions)
+    RegisterSlaveMessage&& registerSlaveMessage)
 {
   ++metrics->messages_register_slave;
 
@@ -6050,11 +6041,7 @@ void Master::registerSlave(
       .onReady(defer(self(),
                      &Self::registerSlave,
                      from,
-                     slaveInfo,
-                     checkpointedResources,
-                     version,
-                     agentCapabilities,
-                     resourceVersions));
+                     std::move(registerSlaveMessage)));
     return;
   }
 
@@ -6070,8 +6057,8 @@ void Master::registerSlave(
     return;
   }
 
-  Option<Error> error = validation::master::message::registerSlave(
-      slaveInfo, checkpointedResources);
+  Option<Error> error =
+    validation::master::message::registerSlave(registerSlaveMessage);
 
   if (error.isSome()) {
     LOG(WARNING) << "Dropping registration of agent at " << from
@@ -6082,13 +6069,13 @@ void Master::registerSlave(
 
   if (slaves.registering.contains(from)) {
     LOG(INFO) << "Ignoring register agent message from " << from
-              << " (" << slaveInfo.hostname() << ") as registration"
-              << " is already in progress";
+              << " (" << registerSlaveMessage.slave().hostname()
+              << ") as registration is already in progress";
     return;
   }
 
-  LOG(INFO) << "Received register agent message from "
-            << from << " (" << slaveInfo.hostname() << ")";
+  LOG(INFO) << "Received register agent message from " << from
+            << " (" << registerSlaveMessage.slave().hostname() << ")";
 
   slaves.registering.insert(from);
 
@@ -6098,13 +6085,13 @@ void Master::registerSlave(
   // master (e.g. when writing to the registry).
   // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks`
   // here for consistency.
-  SlaveInfo _slaveInfo(slaveInfo);
   convertResourceFormat(
-      _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT);
+      registerSlaveMessage.mutable_slave()->mutable_resources(),
+      POST_RESERVATION_REFINEMENT);
 
-  std::vector<Resource> _checkpointedResources(checkpointedResources);
   convertResourceFormat(
-      &_checkpointedResources, POST_RESERVATION_REFINEMENT);
+      registerSlaveMessage.mutable_checkpointed_resources(),
+      POST_RESERVATION_REFINEMENT);
 
   // Note that the principal may be empty if authentication is not
   // required. Also it is passed along because it may be removed from
@@ -6114,30 +6101,24 @@ void Master::registerSlave(
   authorizeSlave(principal)
     .onAny(defer(self(),
                  &Self::_registerSlave,
-                 _slaveInfo,
                  from,
+                 std::move(registerSlaveMessage),
                  principal,
-                 _checkpointedResources,
-                 version,
-                 agentCapabilities,
-                 resourceVersions,
                  lambda::_1));
 }
 
 
 void Master::_registerSlave(
-    const SlaveInfo& slaveInfo,
     const UPID& pid,
+    RegisterSlaveMessage&& registerSlaveMessage,
     const Option<string>& principal,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions,
     const Future<bool>& authorized)
 {
   CHECK(!authorized.isDiscarded());
   CHECK(slaves.registering.contains(pid));
 
+  const SlaveInfo& slaveInfo = registerSlaveMessage.slave();
+
   Option<string> authorizationError = None();
 
   if (authorized.isFailed()) {
@@ -6189,6 +6170,7 @@ void Master::_registerSlave(
   // Ignore registration attempts by agents running old Mesos versions.
   // We expect that the agent's version is in SemVer format; if the
   // version cannot be parsed, the registration attempt is ignored.
+  const string& version = registerSlaveMessage.version();
   Try<Version> parsedVersion = Version::parse(version);
 
   if (parsedVersion.isError()) {
@@ -6253,38 +6235,36 @@ void Master::_registerSlave(
   }
 
   // Create and add the slave id.
-  SlaveInfo slaveInfo_ = slaveInfo;
-  slaveInfo_.mutable_id()->CopyFrom(newSlaveId());
+  SlaveID slaveId = newSlaveId();
 
   LOG(INFO) << "Registering agent at " << pid << " ("
-            << slaveInfo.hostname() << ") with id " << slaveInfo_.id();
+            << slaveInfo.hostname() << ") with id " << slaveId;
+
+  SlaveInfo slaveInfo_ = slaveInfo;
+  slaveInfo_.mutable_id()->CopyFrom(slaveId);
+
+  registerSlaveMessage.mutable_slave()->mutable_id()->CopyFrom(slaveId);
 
   registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_)))
     .onAny(defer(self(),
                  &Self::__registerSlave,
-                 slaveInfo_,
                  pid,
-                 checkpointedResources,
-                 version,
-                 agentCapabilities,
-                 resourceVersions,
+                 std::move(registerSlaveMessage),
                  lambda::_1));
 }
 
 
 void Master::__registerSlave(
-    const SlaveInfo& slaveInfo,
     const UPID& pid,
-    const vector<Resource>& checkpointedResources,
-    const string& version,
-    const vector<SlaveInfo::Capability>& agentCapabilities,
-    const vector<ResourceVersionUUID>& resourceVersions,
+    RegisterSlaveMessage&& registerSlaveMessage,
     const Future<bool>& admit)
 {
   CHECK(slaves.registering.contains(pid));
 
   CHECK(!admit.isDiscarded());
 
+  const SlaveInfo& slaveInfo = registerSlaveMessage.slave();
+
   if (admit.isFailed()) {
     LOG(FATAL) << "Failed to admit agent " << slaveInfo.id() << " at " << pid
                << " (" << slaveInfo.hostname() << "): " << admit.failure();
@@ -6312,17 +6292,22 @@ void Master::__registerSlave(
   machineId.set_hostname(slaveInfo.hostname());
   machineId.set_ip(stringify(pid.address.ip));
 
+  vector<SlaveInfo::Capability> agentCapabilities = google::protobuf::convert(
+      std::move(*registerSlaveMessage.mutable_agent_capabilities()));
+  vector<Resource> checkpointedResources = google::protobuf::convert(
+      std::move(*registerSlaveMessage.mutable_checkpointed_resources()));
+
   Slave* slave = new Slave(
       this,
       slaveInfo,
       pid,
       machineId,
-      version,
-      agentCapabilities,
+      registerSlaveMessage.version(),
+      std::move(agentCapabilities),
       Clock::now(),
-      checkpointedResources,
+      std::move(checkpointedResources),
       protobuf::parseResourceVersions(
-          {resourceVersions.begin(), resourceVersions.end()}));
+          registerSlaveMessage.resource_version_uuids()));
 
   ++metrics->slave_registrations;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5c26f20..7411e0b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -466,11 +466,7 @@ public:
 
   void registerSlave(
       const process::UPID& from,
-      const SlaveInfo& slaveInfo,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions);
+      RegisterSlaveMessage&& registerSlaveMessage);
 
   void reregisterSlave(
       const process::UPID& from,
@@ -588,22 +584,14 @@ protected:
   void recoveredSlavesTimeout(const Registry& registry);
 
   void _registerSlave(
-      const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      RegisterSlaveMessage&& registerSlaveMessage,
       const Option<std::string>& principal,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions,
       const process::Future<bool>& authorized);
 
   void __registerSlave(
-      const SlaveInfo& slaveInfo,
       const process::UPID& pid,
-      const std::vector<Resource>& checkpointedResources,
-      const std::string& version,
-      const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const std::vector<ResourceVersionUUID>& resourceVersions,
+      RegisterSlaveMessage&& registerSlaveMessage,
       const process::Future<bool>& admit);
 
   void _reregisterSlave(

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 585d8bf..a9b0805 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -264,23 +264,23 @@ static Option<Error> validateSlaveInfo(const SlaveInfo& 
slaveInfo)
 }
 
 
-Option<Error> registerSlave(
-    const SlaveInfo& slaveInfo,
-    const vector<Resource>& checkpointedResources)
+Option<Error> registerSlave(const RegisterSlaveMessage& message)
 {
+  const SlaveInfo& slaveInfo = message.slave();
+
   Option<Error> error = validateSlaveInfo(slaveInfo);
   if (error.isSome()) {
     return error.get();
   }
 
-  if (!checkpointedResources.empty()) {
+  if (!message.checkpointed_resources().empty()) {
     if (!slaveInfo.has_checkpoint() || !slaveInfo.checkpoint()) {
       return Error(
           "Checkpointed resources provided when checkpointing is not enabled");
     }
   }
 
-  foreach (const Resource& resource, checkpointedResources) {
+  foreach (const Resource& resource, message.checkpointed_resources()) {
     error = Resources::validate(resource);
     if (error.isSome()) {
       return error.get();
@@ -291,8 +291,7 @@ Option<Error> registerSlave(
 }
 
 
-Option<Error> reregisterSlave(
-    const ReregisterSlaveMessage& message)
+Option<Error> reregisterSlave(const ReregisterSlaveMessage& message)
 {
   hashset<FrameworkID> frameworkIDs;
   hashset<pair<FrameworkID, ExecutorID>> executorIDs;

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3157d7e/src/master/validation.hpp
----------------------------------------------------------------------
diff --git a/src/master/validation.hpp b/src/master/validation.hpp
index 30db3bf..7c129ce 100644
--- a/src/master/validation.hpp
+++ b/src/master/validation.hpp
@@ -66,12 +66,8 @@ namespace message {
 // guarantees at the libprocess level that would prevent arbitrary UPID
 // impersonation (MESOS-7424).
 
-Option<Error> registerSlave(
-    const SlaveInfo& slaveInfo,
-    const std::vector<Resource>& checkpointedResources);
-
-Option<Error> reregisterSlave(
-    const ReregisterSlaveMessage& message);
+Option<Error> registerSlave(const RegisterSlaveMessage& message);
+Option<Error> reregisterSlave(const ReregisterSlaveMessage& message);
 
 } // namespace message {
 } // namespace master {

Reply via email to