Changed agent reregistration to work with messages directly. `reregisterSlave` now accepts `ReregisterSlaveMessage&&`, which opts-out of using protobuf arena, and allows passing message through dispatch chain without making any copies. Conversion of repeated message fields to `std::vector`s is performed only when needed.
Review: https://reviews.apache.org/r/63914/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b422804 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b422804 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b422804 Branch: refs/heads/master Commit: 8b4228049292cab6a8c4a3680de3c1aa5e72a9ff Parents: 4934eb7 Author: Dmitry Zhuk <[email protected]> Authored: Wed Dec 6 09:03:49 2017 -0800 Committer: Michael Park <[email protected]> Committed: Wed Dec 6 12:21:27 2017 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 194 ++++++++++------------------- src/master/master.hpp | 38 +----- src/master/validation.cpp | 15 +-- src/master/validation.hpp | 6 +- src/tests/master_validation_tests.cpp | 57 +++++---- 5 files changed, 107 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 16cdde7..d66f956 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -853,16 +853,7 @@ void Master::initialize() &RegisterSlaveMessage::resource_version_uuids); install<ReregisterSlaveMessage>( - &Master::reregisterSlave, - &ReregisterSlaveMessage::slave, - &ReregisterSlaveMessage::checkpointed_resources, - &ReregisterSlaveMessage::executor_infos, - &ReregisterSlaveMessage::tasks, - &ReregisterSlaveMessage::frameworks, - &ReregisterSlaveMessage::completed_frameworks, - &ReregisterSlaveMessage::version, - &ReregisterSlaveMessage::agent_capabilities, - &ReregisterSlaveMessage::resource_version_uuids); + &Master::reregisterSlave); install<UnregisterSlaveMessage>( &Master::unregisterSlave, @@ -6302,15 +6293,7 @@ void Master::__registerSlave( void Master::reregisterSlave( const UPID& from, - const SlaveInfo& slaveInfo, - const vector<Resource>& checkpointedResources, - const vector<ExecutorInfo>& executorInfos, - const vector<Task>& tasks, - const vector<FrameworkInfo>& frameworks, - const vector<Archive::Framework>& completedFrameworks, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions) + ReregisterSlaveMessage&& reregisterSlaveMessage) { ++metrics->messages_reregister_slave; @@ -6322,15 +6305,7 @@ void Master::reregisterSlave( .onReady(defer(self(), &Self::reregisterSlave, from, - slaveInfo, - checkpointedResources, - executorInfos, - tasks, - frameworks, - completedFrameworks, - version, - agentCapabilities, - resourceVersions)); + std::move(reregisterSlaveMessage))); return; } @@ -6351,6 +6326,7 @@ void Master::reregisterSlave( // capabilities or a higher version (or a changed SlaveInfo, after Mesos 1.5). // However, this should very rarely happen in practice, and nobody seems to // have complained about it so far. + const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave(); if (slaves.reregistering.contains(slaveInfo.id())) { LOG(INFO) << "Ignoring re-register agent message from agent " @@ -6377,22 +6353,8 @@ void Master::reregisterSlave( return; } - Option<Error> error = validation::master::message::reregisterSlave( - slaveInfo, tasks, checkpointedResources, executorInfos, frameworks); - - // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT` - // format. We do this as early as possible so that we only use a single - // format inside master, and downgrade again if necessary when they leave the - // master (e.g. when writing to the registry). - // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks` - // here for consistency. - SlaveInfo _slaveInfo(slaveInfo); - convertResourceFormat( - _slaveInfo.mutable_resources(), POST_RESERVATION_REFINEMENT); - - std::vector<Resource> _checkpointedResources(checkpointedResources); - convertResourceFormat( - &_checkpointedResources, POST_RESERVATION_REFINEMENT); + Option<Error> error = + validation::master::message::reregisterSlave(reregisterSlaveMessage); if (error.isSome()) { LOG(WARNING) << "Dropping re-registration of agent at " << from @@ -6407,6 +6369,20 @@ void Master::reregisterSlave( slaves.reregistering.insert(slaveInfo.id()); + // Update all resources passed by the agent to `POST_RESERVATION_REFINEMENT` + // format. We do this as early as possible so that we only use a single + // format inside master, and downgrade again if necessary when they leave the + // master (e.g. when writing to the registry). + // TODO(bevers): Also convert the resources in `ExecutorInfos` and `Tasks` + // here for consistency. + convertResourceFormat( + reregisterSlaveMessage.mutable_slave()->mutable_resources(), + POST_RESERVATION_REFINEMENT); + + convertResourceFormat( + reregisterSlaveMessage.mutable_checkpointed_resources(), + POST_RESERVATION_REFINEMENT); + // Note that the principal may be empty if authentication is not // required. Also it is passed along because it may be removed from // `authenticated` while the authorization is pending. @@ -6415,36 +6391,22 @@ void Master::reregisterSlave( authorizeSlave(principal) .onAny(defer(self(), &Self::_reregisterSlave, - _slaveInfo, from, + std::move(reregisterSlaveMessage), principal, - _checkpointedResources, - executorInfos, - tasks, - frameworks, - completedFrameworks, - version, - agentCapabilities, - resourceVersions, lambda::_1)); } void Master::_reregisterSlave( - const SlaveInfo& slaveInfo, const UPID& pid, + ReregisterSlaveMessage&& reregisterSlaveMessage, const Option<string>& principal, - const vector<Resource>& checkpointedResources, - const vector<ExecutorInfo>& executorInfos, - const vector<Task>& tasks, - const vector<FrameworkInfo>& frameworks, - const vector<Archive::Framework>& completedFrameworks, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions, const Future<bool>& authorized) { CHECK(!authorized.isDiscarded()); + + const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave(); CHECK(slaves.reregistering.contains(slaveInfo.id())); Option<string> authorizationError = None(); @@ -6516,6 +6478,7 @@ void Master::_reregisterSlave( // Ignore re-registration attempts by agents running old Mesos versions. // We expect that the agent's version is in SemVer format; if the // version cannot be parsed, the re-registration attempt is ignored. + const string& version = reregisterSlaveMessage.version(); Try<Version> parsedVersion = Version::parse(version); if (parsedVersion.isError()) { @@ -6587,27 +6550,15 @@ void Master::_reregisterSlave( // previously known state. if (slaveInfo == slave->info) { ___reregisterSlave( - slaveInfo, pid, - executorInfos, - tasks, - frameworks, - version, - agentCapabilities, - resourceVersions, + std::move(reregisterSlaveMessage), true); } else { registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo))) .onAny(defer(self(), &Self::___reregisterSlave, - slaveInfo, pid, - executorInfos, - tasks, - frameworks, - version, - agentCapabilities, - resourceVersions, + std::move(reregisterSlaveMessage), lambda::_1)); } } else if (slaves.recovered.contains(slaveInfo.id())) { @@ -6624,31 +6575,15 @@ void Master::_reregisterSlave( // previously known state (see also MESOS-7711). if (slaveInfo == recoveredInfo) { __reregisterSlave( - slaveInfo, pid, - checkpointedResources, - executorInfos, - tasks, - frameworks, - completedFrameworks, - version, - agentCapabilities, - resourceVersions, + std::move(reregisterSlaveMessage), true); } else { registrar->apply(Owned<Operation>(new UpdateSlave(slaveInfo))) .onAny(defer(self(), &Self::__reregisterSlave, - slaveInfo, pid, - checkpointedResources, - executorInfos, - tasks, - frameworks, - completedFrameworks, - version, - agentCapabilities, - resourceVersions, + std::move(reregisterSlaveMessage), lambda::_1)); } } else { @@ -6663,34 +6598,19 @@ void Master::_reregisterSlave( registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo))) .onAny(defer(self(), &Self::__reregisterSlave, - slaveInfo, pid, - checkpointedResources, - executorInfos, - tasks, - frameworks, - completedFrameworks, - version, - agentCapabilities, - resourceVersions, + std::move(reregisterSlaveMessage), lambda::_1)); } } void Master::__reregisterSlave( - const SlaveInfo& slaveInfo, const UPID& pid, - const vector<Resource>& checkpointedResources, - const vector<ExecutorInfo>& executorInfos_, - const vector<Task>& tasks_, - const vector<FrameworkInfo>& frameworks, - const vector<Archive::Framework>& completedFrameworks_, - const string& version, - const vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions, + ReregisterSlaveMessage&& reregisterSlaveMessage, const Future<bool>& future) { + const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave(); CHECK(slaves.reregistering.contains(slaveInfo.id())); if (future.isFailed()) { @@ -6752,12 +6672,22 @@ void Master::__reregisterSlave( } }; + vector<Resource> checkpointedResources = + google::protobuf::convert(reregisterSlaveMessage.checkpointed_resources()); + vector<ExecutorInfo> executorInfos = + google::protobuf::convert(reregisterSlaveMessage.executor_infos()); + vector<Task> tasks = + google::protobuf::convert(reregisterSlaveMessage.tasks()); + const vector<FrameworkInfo> frameworks = + google::protobuf::convert(reregisterSlaveMessage.frameworks()); + vector<Archive::Framework> completedFrameworks = + google::protobuf::convert(reregisterSlaveMessage.completed_frameworks()); + vector<SlaveInfo::Capability> agentCapabilities = + google::protobuf::convert(reregisterSlaveMessage.agent_capabilities()); + // Adjust the agent's task and executor infos to ensure // compatibility with old agents without certain capabilities. protobuf::slave::Capabilities slaveCapabilities(agentCapabilities); - vector<Task> tasks = tasks_; - vector<ExecutorInfo> executorInfos = executorInfos_; - vector<Archive::Framework> completedFrameworks = completedFrameworks_; // If the agent is not multi-role capable, inject allocation info. if (!slaveCapabilities.multiRole) { @@ -6845,14 +6775,14 @@ void Master::__reregisterSlave( slaveInfo, pid, machineId, - version, - agentCapabilities, + reregisterSlaveMessage.version(), + std::move(agentCapabilities), Clock::now(), - checkpointedResources, + std::move(checkpointedResources), protobuf::parseResourceVersions( - {resourceVersions.begin(), resourceVersions.end()}), - executorInfos, - recoveredTasks); + reregisterSlaveMessage.resource_version_uuids()), + std::move(executorInfos), + std::move(recoveredTasks)); slave->reregisteredTime = Clock::now(); @@ -6905,16 +6835,11 @@ void Master::__reregisterSlave( void Master::___reregisterSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<FrameworkInfo>& frameworks, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const vector<ResourceVersionUUID>& resourceVersions, + ReregisterSlaveMessage&& reregisterSlaveMessage, const process::Future<bool>& updated) { + const SlaveInfo& slaveInfo = reregisterSlaveMessage.slave(); CHECK(slaves.reregistering.contains(slaveInfo.id())); CHECK_READY(updated); @@ -6965,6 +6890,12 @@ void Master::___reregisterSlave( slave->pid = pid; link(slave->pid); + const string& version = reregisterSlaveMessage.version(); + const vector<SlaveInfo::Capability> agentCapabilities = + google::protobuf::convert(reregisterSlaveMessage.agent_capabilities()); + const vector<ResourceVersionUUID> resourceVersions = + google::protobuf::convert(reregisterSlaveMessage.resource_version_uuids()); + Try<Nothing> stateUpdated = slave->update(slaveInfo, version, agentCapabilities, resourceVersions); @@ -6993,6 +6924,13 @@ void Master::___reregisterSlave( slave->totalResources, agentCapabilities); + const vector<ExecutorInfo> executorInfos = + google::protobuf::convert(reregisterSlaveMessage.executor_infos()); + const vector<Task> tasks = + google::protobuf::convert(reregisterSlaveMessage.tasks()); + const vector<FrameworkInfo> frameworks = + google::protobuf::convert(reregisterSlaveMessage.frameworks()); + // Reconcile tasks between master and slave, and send the // `SlaveReregisteredMessage`. reconcileKnownSlave(slave, executorInfos, tasks); http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index d42acae..acea507 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -475,15 +475,7 @@ public: void reregisterSlave( const process::UPID& from, - const SlaveInfo& slaveInfo, - const std::vector<Resource>& checkpointedResources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<FrameworkInfo>& frameworks, - const std::vector<Archive::Framework>& completedFrameworks, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions); + ReregisterSlaveMessage&& incomingMessage); void unregisterSlave( const process::UPID& from, @@ -616,41 +608,19 @@ protected: const process::Future<bool>& admit); void _reregisterSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, + ReregisterSlaveMessage&& incomingMessage, const Option<std::string>& principal, - const std::vector<Resource>& checkpointedResources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<FrameworkInfo>& frameworks, - const std::vector<Archive::Framework>& completedFrameworks, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions, const process::Future<bool>& authorized); void __reregisterSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, - const std::vector<Resource>& checkpointedResources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<FrameworkInfo>& frameworks, - const std::vector<Archive::Framework>& completedFrameworks, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions, + ReregisterSlaveMessage&& incomingMessage, const process::Future<bool>& readmit); void ___reregisterSlave( - const SlaveInfo& slaveInfo, const process::UPID& pid, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<Task>& tasks, - const std::vector<FrameworkInfo>& frameworks, - const std::string& version, - const std::vector<SlaveInfo::Capability>& agentCapabilities, - const std::vector<ResourceVersionUUID>& resourceVersions, + ReregisterSlaveMessage&& incomingMessage, const process::Future<bool>& updated); void updateSlaveFrameworks( http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 8b5848b..bf7ae65 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -292,28 +292,25 @@ Option<Error> registerSlave( Option<Error> reregisterSlave( - const SlaveInfo& slaveInfo, - const vector<Task>& tasks, - const vector<Resource>& resources, - const vector<ExecutorInfo>& executorInfos, - const vector<FrameworkInfo>& frameworkInfos) + const ReregisterSlaveMessage& message) { hashset<FrameworkID> frameworkIDs; hashset<pair<FrameworkID, ExecutorID>> executorIDs; + const SlaveInfo& slaveInfo = message.slave(); Option<Error> error = validateSlaveInfo(slaveInfo); if (error.isSome()) { return error.get(); } - foreach (const Resource& resource, resources) { + foreach (const Resource& resource, message.checkpointed_resources()) { Option<Error> error = Resources::validate(resource); if (error.isSome()) { return error.get(); } } - foreach (const FrameworkInfo& framework, frameworkInfos) { + foreach (const FrameworkInfo& framework, message.frameworks()) { Option<Error> error = validation::framework::validate(framework); if (error.isSome()) { return error.get(); @@ -327,7 +324,7 @@ Option<Error> reregisterSlave( frameworkIDs.insert(framework.id()); } - foreach (const ExecutorInfo& executor, executorInfos) { + foreach (const ExecutorInfo& executor, message.executor_infos()) { Option<Error> error = validation::executor::validate(executor); if (error.isSome()) { return error.get(); @@ -362,7 +359,7 @@ Option<Error> reregisterSlave( } } - foreach (const Task& task, tasks) { + foreach (const Task& task, message.tasks()) { Option<Error> error = common::validation::validateTaskID(task.task_id()); if (error.isSome()) { return Error("Task has an invalid TaskID: " + error->message); http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index ac54062..30db3bf 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -71,11 +71,7 @@ Option<Error> registerSlave( const std::vector<Resource>& checkpointedResources); Option<Error> reregisterSlave( - const SlaveInfo& slaveInfo, - const std::vector<Task>& tasks, - const std::vector<Resource>& resources, - const std::vector<ExecutorInfo>& executorInfos, - const std::vector<FrameworkInfo>& frameworkInfos); + const ReregisterSlaveMessage& message); } // namespace message { } // namespace master { http://git-wip-us.apache.org/repos/asf/mesos/blob/8b422804/src/tests/master_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index 6398f16..48c69a5 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -4215,43 +4215,46 @@ TEST_F(RegisterSlaveValidationTest, DropInvalidRegistration) // validating the ReregisterSlaveMessage. TEST_F(RegisterSlaveValidationTest, DuplicateExecutorID) { - SlaveInfo slaveInfo; - slaveInfo.mutable_id()->set_value("agent-id"); - slaveInfo.mutable_resources()->CopyFrom( - Resources::parse("cpus:2;mem:10").get()); + ReregisterSlaveMessage message; - vector<Task> tasks; - vector<Resource> resources; - vector<ExecutorInfo> executors; - vector<FrameworkInfo> frameworks; + SlaveInfo *slaveInfo = message.mutable_slave(); + slaveInfo->mutable_id()->set_value("agent-id"); + slaveInfo->mutable_resources()->CopyFrom( + Resources::parse("cpus:2;mem:10").get()); - frameworks.push_back(DEFAULT_FRAMEWORK_INFO); - frameworks.back().set_name("framework1"); - frameworks.back().mutable_id()->set_value("framework1"); + FrameworkInfo *framework = message.add_frameworks(); + framework->CopyFrom(DEFAULT_FRAMEWORK_INFO); + framework->set_name("framework1"); + framework->mutable_id()->set_value("framework1"); - frameworks.push_back(DEFAULT_FRAMEWORK_INFO); - frameworks.back().set_name("framework2"); - frameworks.back().mutable_id()->set_value("framework2"); + framework = message.add_frameworks(); + framework->CopyFrom(DEFAULT_FRAMEWORK_INFO); + framework->set_name("framework2"); + framework->mutable_id()->set_value("framework2"); - executors.push_back(DEFAULT_EXECUTOR_INFO); - executors.back().mutable_framework_id()->set_value("framework1"); + ExecutorInfo *executor = message.add_executor_infos(); + executor->CopyFrom(DEFAULT_EXECUTOR_INFO); + executor->mutable_framework_id()->set_value("framework1"); - executors.push_back(DEFAULT_EXECUTOR_INFO); - executors.back().mutable_framework_id()->set_value("framework2"); + executor = message.add_executor_infos(); + executor->CopyFrom(DEFAULT_EXECUTOR_INFO); + executor->mutable_framework_id()->set_value("framework2"); // Executors with the same ID in different frameworks are allowed. - EXPECT_EQ(executors[0].executor_id(), executors[1].executor_id()); - EXPECT_NE(executors[0].framework_id(), executors[1].framework_id()); - EXPECT_NONE(master::validation::master::message::reregisterSlave( - slaveInfo, tasks, resources, executors, frameworks)); + EXPECT_EQ(message.executor_infos(0).executor_id(), + message.executor_infos(1).executor_id()); + EXPECT_NE(message.executor_infos(0).framework_id(), + message.executor_infos(1).framework_id()); + EXPECT_NONE(master::validation::master::message::reregisterSlave(message)); - executors[1].mutable_framework_id()->set_value("framework1"); + executor->mutable_framework_id()->set_value("framework1"); // Executors with the same ID in in the same framework are not allowed. - EXPECT_EQ(executors[0].executor_id(), executors[1].executor_id()); - EXPECT_EQ(executors[0].framework_id(), executors[1].framework_id()); - EXPECT_SOME(master::validation::master::message::reregisterSlave( - slaveInfo, tasks, resources, executors, frameworks)); + EXPECT_EQ(message.executor_infos(0).executor_id(), + message.executor_infos(1).executor_id()); + EXPECT_EQ(message.executor_infos(0).framework_id(), + message.executor_infos(1).framework_id()); + EXPECT_SOME(master::validation::master::message::reregisterSlave(message)); } } // namespace tests {
