Repository: mesos Updated Branches: refs/heads/master 957b4f280 -> 54e47b443
Added parameter validation to ReRegisterSlaveMessage. The ReRegisterSlaveMessage message sends a number of fields which have internal consistency requirements. Add some simple validation checks to ensure that we have a minimally consistent re-registration request before proceeding. Review: https://reviews.apache.org/r/58305/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b22c9e73 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b22c9e73 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b22c9e73 Branch: refs/heads/master Commit: b22c9e73375b38c98a25c337e0d5d13e2c826573 Parents: 957b4f2 Author: James Peach <[email protected]> Authored: Wed Apr 26 15:36:12 2017 -0400 Committer: Neil Conway <[email protected]> Committed: Wed Apr 26 16:01:29 2017 -0400 ---------------------------------------------------------------------- src/master/master.cpp | 10 +++ src/master/validation.cpp | 135 +++++++++++++++++++++++++++-- src/master/validation.hpp | 16 ++++ src/tests/master_validation_tests.cpp | 54 ++++++++++++ 4 files changed, 208 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d1cdc35..9814df2 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -5567,6 +5567,16 @@ void Master::reregisterSlave( return; } + Option<Error> error = validation::master::message::reregisterSlave( + slaveInfo, tasks, checkpointedResources, executorInfos, frameworks); + + if (error.isSome()) { + LOG(WARNING) << "Dropping re-registration of agent at " << from + << " because it sent an invalid re-registration: " + << error->message; + return; + } + MachineID machineId; machineId.set_hostname(slaveInfo.hostname()); machineId.set_ip(stringify(from.address.ip)); http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 3f70875..768fc35 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -228,6 +228,105 @@ Option<Error> validate( } } // namespace call { + +namespace message { + +Option<Error> reregisterSlave( + const SlaveInfo& slaveInfo, + const vector<Task>& tasks, + const vector<Resource>& resources, + const vector<ExecutorInfo>& executorInfos, + const vector<FrameworkInfo>& frameworkInfos) +{ + hashset<FrameworkID> frameworkIDs; + hashset<ExecutorID> executorIDs; + + foreach (const Resource& resource, resources) { + Option<Error> error = Resources::validate(resource); + if (error.isSome()) { + return error.get(); + } + } + + foreach (const FrameworkInfo& framework, frameworkInfos) { + Option<Error> error = validation::framework::validate(framework); + if (error.isSome()) { + return error.get(); + } + + if (frameworkIDs.contains(framework.id())) { + return Error("Framework has a duplicate FrameworkID: '" + + stringify(framework.id()) + "'"); + } + + frameworkIDs.insert(framework.id()); + } + + foreach (const ExecutorInfo& executor, executorInfos) { + Option<Error> error = validation::executor::validate(executor); + if (error.isSome()) { + return error.get(); + } + + // We don't use internal::validateResources() here because + // that includes the validateAllocatedToSingleRole() check, + // which is not valid for agent re-registration. + error = Resources::validate(executor.resources()); + if (error.isSome()) { + return error.get(); + } + + if (!frameworkIDs.contains(executor.framework_id())) { + return Error("Executor has an invalid FrameworkID '" + + stringify(executor.framework_id()) + "'"); + } + + if (executor.has_executor_id()) { + if (executorIDs.contains(executor.executor_id())) { + return Error("Executor has a duplicate ExecutorID '" + + stringify(executor.executor_id()) + "'"); + } + + executorIDs.insert(executor.executor_id()); + } + } + + foreach (const Task& task, tasks) { + Option<Error> error = common::validation::validateTaskID(task.task_id()); + if (error.isSome()) { + return Error("Task has an invalid TaskID: " + error->message); + } + + if (task.slave_id() != slaveInfo.id()) { + return Error("Task has an invalid SlaveID '" + + stringify(task.slave_id()) + "'"); + } + + if (!frameworkIDs.contains(task.framework_id())) { + return Error("Task has an invalid FrameworkID '" + + stringify(task.framework_id()) + "'"); + } + + // Command Executors don't send the executor ID in the task because it + // is generated on the agent (see Slave::doReliableRegistration). Only + // running tasks ought to have executors. + if (task.has_executor_id() && task.state() == TASK_RUNNING) { + if (!executorIDs.contains(task.executor_id())) { + return Error("Task has an invalid ExecutorID '" + + stringify(task.executor_id()) + "'"); + } + } + + error = resource::validate(task.resources()); + if (error.isSome()) { + return Error("Task uses invalid resources: " + error->message); + } + } + + return None(); +} + +} // namespace message { } // namespace master { @@ -827,19 +926,20 @@ Option<Error> validate( CHECK_NOTNULL(framework); CHECK_NOTNULL(slave); - vector<lambda::function<Option<Error>()>> validators = { - lambda::bind(internal::validateType, executor), - lambda::bind(internal::validateExecutorID, executor), + Option<Error> error = executor::validate(executor); + if (error.isSome()) { + return error; + } + + const vector<lambda::function<Option<Error>()>> executorValidators = { lambda::bind(internal::validateFrameworkID, executor, framework), - lambda::bind(internal::validateShutdownGracePeriod, executor), lambda::bind(internal::validateResources, executor), lambda::bind( internal::validateCompatibleExecutorInfo, executor, framework, slave), - lambda::bind(internal::validateCommandInfo, executor) }; - foreach (const lambda::function<Option<Error>()>& validator, validators) { - Option<Error> error = validator(); + foreach (const auto& validator, executorValidators) { + error = validator(); if (error.isSome()) { return error; } @@ -849,6 +949,27 @@ Option<Error> validate( } } // namespace internal { + +Option<Error> validate(const ExecutorInfo& executor) +{ + const vector<lambda::function<Option<Error>(const ExecutorInfo&)>> + executorValidators = { + internal::validateType, + internal::validateExecutorID, + internal::validateShutdownGracePeriod, + internal::validateCommandInfo, + }; + + foreach (const auto& validator, executorValidators) { + Option<Error> error = validator(executor); + if (error.isSome()) { + return error.get(); + } + } + + return None(); +} + } // namespace executor { http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/master/validation.hpp ---------------------------------------------------------------------- diff --git a/src/master/validation.hpp b/src/master/validation.hpp index d96287d..5f01a67 100644 --- a/src/master/validation.hpp +++ b/src/master/validation.hpp @@ -17,6 +17,8 @@ #ifndef __MASTER_VALIDATION_HPP__ #define __MASTER_VALIDATION_HPP__ +#include <vector> + #include <google/protobuf/repeated_field.h> #include <mesos/mesos.hpp> @@ -52,6 +54,17 @@ Option<Error> validate( const Option<process::http::authentication::Principal>& principal = None()); } // namespace call { + +namespace message { + +Option<Error> reregisterSlave( + const SlaveInfo& slaveInfo, + const std::vector<Task>& tasks, + const std::vector<Resource>& resources, + const std::vector<ExecutorInfo>& executorInfos, + const std::vector<FrameworkInfo>& frameworkInfos); + +} // namespace message { } // namespace master { @@ -130,6 +143,9 @@ Option<Error> validateType(const ExecutorInfo& executor); Option<Error> validateResources(const ExecutorInfo& executor); } // namespace internal { + +Option<Error> validate(const ExecutorInfo& executor); + } // namespace executor { http://git-wip-us.apache.org/repos/asf/mesos/blob/b22c9e73/src/tests/master_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index 5553808..1e0e764 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -3726,6 +3726,60 @@ TEST_F(FrameworkInfoValidationTest, RoleChangeWithMultiRoleMasterFailover) } } + +class RegisterSlaveValidationTest : public MesosTest {}; + + +TEST_F(RegisterSlaveValidationTest, DropInvalidReregistration) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _); + + StandaloneMasterDetector detector(master.get()->pid); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + // Wait until the master acknowledges the slave registration. + AWAIT_READY(slaveRegisteredMessage); + + // Drop and capture the slave's ReregisterSlaveMessage. + Future<ReregisterSlaveMessage> reregisterSlaveMessage = + DROP_PROTOBUF(ReregisterSlaveMessage(), slave.get()->pid, _); + + // Simulate a new master detected event on the slave, + // so that the slave will do a re-registration. + detector.appoint(master.get()->pid); + + AWAIT_READY(reregisterSlaveMessage); + + // Now that we have a valid ReregisterSlaveMessage, tweak it to + // fail validation. + ReregisterSlaveMessage message = reregisterSlaveMessage.get(); + + Task* task = message.add_tasks(); + task->set_name(UUID::random().toString()); + task->mutable_slave_id()->set_value(UUID::random().toString()); + task->mutable_task_id()->set_value(UUID::random().toString()); + task->mutable_framework_id()->set_value(UUID::random().toString()); + task->mutable_executor_id()->set_value(UUID::random().toString()); + task->set_state(TASK_RUNNING); + + // We expect the master to drop the ReregisterSlaveMessage, so it + // will not send any more SlaveReregisteredMessage responses. + EXPECT_NO_FUTURE_PROTOBUFS(SlaveReregisteredMessage(), _, _); + + // Send the modified message to the master. + process::post(slave.get()->pid, master->get()->pid, message); + + // Settle the clock to retire in-flight messages. + Clock::pause(); + Clock::settle(); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
