Applied RegisterAgent ACL to the master. Review: https://reviews.apache.org/r/57535
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29fc2dfc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29fc2dfc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29fc2dfc Branch: refs/heads/master Commit: 29fc2dfcb110a51923d4d7c144bdd797b348f96b Parents: 38afa9c Author: Jiang Yan Xu <[email protected]> Authored: Fri Mar 10 15:25:43 2017 -0800 Committer: Jiang Yan Xu <[email protected]> Committed: Fri Apr 28 14:55:08 2017 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 247 +++++++++++++++++++++----- src/master/master.hpp | 39 +++- src/tests/master_authorization_tests.cpp | 162 +++++++++++++++++ 3 files changed, 394 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index e8c2a96..2be4056 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3651,6 +3651,31 @@ Future<bool> Master::authorizeDestroyVolume( } +Future<bool> Master::authorizeSlave(const Option<string>& principal) +{ + if (authorizer.isNone()) { + return true; + } + + LOG(INFO) << "Authorizing agent " + << (principal.isSome() + ? "with principal '" + principal.get() + "'" + : "without a principal"); + + authorization::Request request; + request.set_action(authorization::REGISTER_AGENT); + + if (principal.isSome()) { + request.mutable_subject()->set_value(principal.get()); + } + + // No need to set the request's object as it is implicitly set to + // ANY by the authorizer. + + return authorizer.get()->authorized(request); +} + + Resources Master::addTask( const TaskInfo& task, Framework* framework, @@ -5388,26 +5413,91 @@ void Master::registerSlave( return; } + if (slaves.registering.contains(from)) { + LOG(INFO) << "Ignoring register agent message from " << from + << " (" << slaveInfo.hostname() << ") as registration" + << " is already in progress"; + return; + } + + slaves.registering.insert(from); + + // Note that the principal may be empty if authentication is not + // required. Also it is passed along because it may be removed from + // `authenticated` while the authorization is pending. + Option<string> principal = authenticated.get(from); + + authorizeSlave(principal) + .onAny(defer(self(), + &Self::_registerSlave, + slaveInfo, + from, + principal, + checkpointedResources, + version, + agentCapabilities, + lambda::_1)); +} + + +void Master::_registerSlave( + const SlaveInfo& slaveInfo, + const UPID& pid, + const Option<string>& principal, + const vector<Resource>& checkpointedResources, + const string& version, + const vector<SlaveInfo::Capability>& agentCapabilities, + const Future<bool>& authorized) +{ + CHECK(!authorized.isDiscarded()); + CHECK(slaves.registering.contains(pid)); + + Option<string> authorizationError = None(); + + if (authorized.isFailed()) { + authorizationError = "Authorization failure: " + authorized.failure(); + } else if (!authorized.get()) { + authorizationError = + "Not authorized to register as agent " + + (principal.isSome() + ? "with principal '" + principal.get() + "'" + : "without a principal"); + } + + if (authorizationError.isSome()) { + LOG(WARNING) << "Refusing registration of agent at " << pid + << ": " << authorizationError.get(); + + ShutdownMessage message; + message.set_message(authorizationError.get()); + send(pid, message); + + slaves.registering.erase(pid); + return; + } + MachineID machineId; machineId.set_hostname(slaveInfo.hostname()); - machineId.set_ip(stringify(from.address.ip)); + machineId.set_ip(stringify(pid.address.ip)); // Slaves are not allowed to register while the machine they are on is in // `DOWN` mode. if (machines.contains(machineId) && machines[machineId].info.mode() == MachineInfo::DOWN) { - LOG(WARNING) << "Refusing registration of agent at " << from + LOG(WARNING) << "Refusing registration of agent at " << pid << " because the machine '" << machineId << "' that it is " << "running on is `DOWN`"; ShutdownMessage message; message.set_message("Machine is `DOWN`"); - send(from, message); + send(pid, message); + + slaves.registering.erase(pid); return; } // Check if this slave is already registered (because it retries). - if (Slave* slave = slaves.registered.get(from)) { + if (Slave* slave = slaves.registered.get(pid)) { if (!slave->connected) { // The slave was previously disconnected but it is now trying // to register as a new slave. This could happen if the slave @@ -5433,33 +5523,25 @@ void Master::registerSlave( SlaveRegisteredMessage message; message.mutable_slave_id()->CopyFrom(slave->id); message.mutable_connection()->CopyFrom(connection); - send(from, message); + send(pid, message); + + slaves.registering.erase(pid); return; } } - // We need to generate a SlaveID and admit this slave only *once*. - if (slaves.registering.contains(from)) { - LOG(INFO) << "Ignoring register agent message from " << from - << " (" << slaveInfo.hostname() << ") as admission is" - << " already in progress"; - return; - } - - slaves.registering.insert(from); - // Create and add the slave id. SlaveInfo slaveInfo_ = slaveInfo; slaveInfo_.mutable_id()->CopyFrom(newSlaveId()); - LOG(INFO) << "Registering agent at " << from << " (" + LOG(INFO) << "Registering agent at " << pid << " (" << slaveInfo.hostname() << ") with id " << slaveInfo_.id(); registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_))) .onAny(defer(self(), - &Self::_registerSlave, + &Self::__registerSlave, slaveInfo_, - from, + pid, checkpointedResources, version, agentCapabilities, @@ -5467,7 +5549,7 @@ void Master::registerSlave( } -void Master::_registerSlave( +void Master::__registerSlave( const SlaveInfo& slaveInfo, const UPID& pid, const vector<Resource>& checkpointedResources, @@ -5476,7 +5558,6 @@ void Master::_registerSlave( const Future<bool>& admit) { CHECK(slaves.registering.contains(pid)); - slaves.registering.erase(pid); CHECK(!admit.isDiscarded()); @@ -5495,6 +5576,8 @@ void Master::_registerSlave( << " (" << slaveInfo.hostname() << ") was assigned" << " an agent ID that already appears in the registry;" << " ignoring registration attempt"; + + slaves.registering.erase(pid); return; } @@ -5528,6 +5611,8 @@ void Master::_registerSlave( LOG(INFO) << "Registered agent " << *slave << " with " << slave->info.resources(); + + slaves.registering.erase(pid); } @@ -5585,21 +5670,95 @@ void Master::reregisterSlave( return; } + if (slaves.reregistering.contains(slaveInfo.id())) { + LOG(INFO) + << "Ignoring re-register agent message from agent " + << slaveInfo.id() << " at " << from << " (" + << slaveInfo.hostname() << ") as re-registration is already in progress"; + return; + } + + slaves.reregistering.insert(slaveInfo.id()); + + // Note that the principal may be empty if authentication is not + // required. Also it is passed along because it may be removed from + // `authenticated` while the authorization is pending. + Option<string> principal = authenticated.get(from); + + authorizeSlave(principal) + .onAny(defer(self(), + &Self::_reregisterSlave, + slaveInfo, + from, + principal, + checkpointedResources, + executorInfos, + tasks, + frameworks, + completedFrameworks, + version, + agentCapabilities, + lambda::_1)); +} + + +void Master::_reregisterSlave( + const SlaveInfo& slaveInfo, + const UPID& pid, + const Option<string>& principal, + const vector<Resource>& checkpointedResources, + const vector<ExecutorInfo>& executorInfos, + const vector<Task>& tasks, + const vector<FrameworkInfo>& frameworks, + const vector<Archive::Framework>& completedFrameworks, + const string& version, + const vector<SlaveInfo::Capability>& agentCapabilities, + const Future<bool>& authorized) +{ + CHECK(!authorized.isDiscarded()); + CHECK(slaves.reregistering.contains(slaveInfo.id())); + + Option<string> authorizationError = None(); + + if (authorized.isFailed()) { + authorizationError = "Authorization failure: " + authorized.failure(); + } else if (!authorized.get()) { + authorizationError = + "Not authorized to re-register as agent with principal " + + (principal.isSome() + ? "with principal '" + principal.get() + "'" + : "without a principal"); + } + + if (authorizationError.isSome()) { + LOG(WARNING) << "Refusing re-registration of agent at " << pid + << ": " << authorizationError.get(); + + ShutdownMessage message; + message.set_message(authorizationError.get()); + send(pid, message); + + slaves.reregistering.erase(slaveInfo.id()); + return; + } + MachineID machineId; machineId.set_hostname(slaveInfo.hostname()); - machineId.set_ip(stringify(from.address.ip)); + machineId.set_ip(stringify(pid.address.ip)); - // Slaves are not allowed to register while the machine they are on is in + // Slaves are not allowed to re-register while the machine they are on is in // 'DOWN` mode. if (machines.contains(machineId) && machines[machineId].info.mode() == MachineInfo::DOWN) { - LOG(WARNING) << "Refusing re-registration of agent at " << from + LOG(WARNING) << "Refusing re-registration of agent at " << pid << " because the machine '" << machineId << "' that it is " << "running on is `DOWN`"; ShutdownMessage message; message.set_message("Machine is `DOWN`"); - send(from, message); + send(pid, message); + + slaves.reregistering.erase(slaveInfo.id()); return; } @@ -5619,9 +5778,9 @@ void Master::reregisterSlave( // hostname. This is because maintenance is scheduled at the // machine level; so we would need to re-validate the slave's // unavailability if the machine it is running on changed. - if (slave->pid.address.ip != from.address.ip || + if (slave->pid.address.ip != pid.address.ip || slave->info.hostname() != slaveInfo.hostname()) { - LOG(WARNING) << "Agent " << slaveInfo.id() << " at " << from + LOG(WARNING) << "Agent " << slaveInfo.id() << " at " << pid << " (" << slaveInfo.hostname() << ") attempted to " << "re-register with different IP / hostname; expected " << slave->pid.address.ip << " (" << slave->info.hostname() @@ -5631,7 +5790,9 @@ void Master::reregisterSlave( message.set_message( "Agent attempted to re-register with different IP / hostname"); - send(from, message); + send(pid, message); + + slaves.reregistering.erase(slaveInfo.id()); return; } @@ -5641,7 +5802,7 @@ void Master::reregisterSlave( // in succession for a disconnected slave. As a result, we // ignore duplicate exited events for disconnected slaves. // See: https://issues.apache.org/jira/browse/MESOS-675 - slave->pid = from; + slave->pid = pid; link(slave->pid); // Update slave's version, re-registration timestamp and @@ -5676,26 +5837,15 @@ void Master::reregisterSlave( // Inform the agent of the master's version of its checkpointed // resources and the new framework pids for its tasks. - __reregisterSlave(slave, tasks, frameworks); - - return; - } + ___reregisterSlave(slave, tasks, frameworks); - // If we're already re-registering this slave, then no need to ask - // the registrar again. - if (slaves.reregistering.contains(slaveInfo.id())) { - LOG(INFO) - << "Ignoring re-register agent message from agent " - << slaveInfo.id() << " at " << from << " (" - << slaveInfo.hostname() << ") as readmission is already in progress"; + slaves.reregistering.erase(slaveInfo.id()); return; } - LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << from + LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << pid << " (" << slaveInfo.hostname() << ")"; - slaves.reregistering.insert(slaveInfo.id()); - // Consult the registry to determine whether to readmit the // slave. In the common case, the slave has been marked unreachable // by the master, so we move the slave to the reachable list and @@ -5704,9 +5854,9 @@ void Master::reregisterSlave( // GC'd), we admit the slave anyway. registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo))) .onAny(defer(self(), - &Self::_reregisterSlave, + &Self::__reregisterSlave, slaveInfo, - from, + pid, checkpointedResources, executorInfos, tasks, @@ -5718,7 +5868,7 @@ void Master::reregisterSlave( } -void Master::_reregisterSlave( +void Master::__reregisterSlave( const SlaveInfo& slaveInfo, const UPID& pid, const vector<Resource>& checkpointedResources, @@ -5731,7 +5881,6 @@ void Master::_reregisterSlave( const Future<bool>& readmit) { CHECK(slaves.reregistering.contains(slaveInfo.id())); - slaves.reregistering.erase(slaveInfo.id()); if (readmit.isFailed()) { LOG(FATAL) << "Failed to readmit agent " << slaveInfo.id() << " at " << pid @@ -5966,11 +6115,13 @@ void Master::_reregisterSlave( } } - __reregisterSlave(slave, tasks, frameworks); + ___reregisterSlave(slave, tasks, frameworks); + + slaves.reregistering.erase(slaveInfo.id()); } -void Master::__reregisterSlave( +void Master::___reregisterSlave( Slave* slave, const vector<Task>& tasks, const vector<FrameworkInfo>& frameworks) http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index d537933..eca353b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -514,6 +514,15 @@ protected: void _registerSlave( const SlaveInfo& slaveInfo, const process::UPID& pid, + const Option<std::string>& principal, + const std::vector<Resource>& checkpointedResources, + const std::string& version, + const std::vector<SlaveInfo::Capability>& agentCapabilities, + const process::Future<bool>& authorized); + + void __registerSlave( + const SlaveInfo& slaveInfo, + const process::UPID& pid, const std::vector<Resource>& checkpointedResources, const std::string& version, const std::vector<SlaveInfo::Capability>& agentCapabilities, @@ -522,6 +531,7 @@ protected: void _reregisterSlave( const SlaveInfo& slaveInfo, const process::UPID& pid, + const Option<std::string>& principal, const std::vector<Resource>& checkpointedResources, const std::vector<ExecutorInfo>& executorInfos, const std::vector<Task>& tasks, @@ -529,9 +539,21 @@ protected: const std::vector<Archive::Framework>& completedFrameworks, const std::string& version, const std::vector<SlaveInfo::Capability>& agentCapabilities, - const process::Future<bool>& readmit); + const process::Future<bool>& authorized); void __reregisterSlave( + const SlaveInfo& slaveInfo, + const process::UPID& pid, + const std::vector<Resource>& checkpointedResources, + const std::vector<ExecutorInfo>& executorInfos, + const std::vector<Task>& tasks, + const std::vector<FrameworkInfo>& frameworks, + const std::vector<Archive::Framework>& completedFrameworks, + const std::string& version, + const std::vector<SlaveInfo::Capability>& agentCapabilities, + const process::Future<bool>& readmit); + + void ___reregisterSlave( Slave* slave, const std::vector<Task>& tasks, const std::vector<FrameworkInfo>& frameworks); @@ -659,6 +681,9 @@ protected: process::Future<bool> authorizeFramework( const FrameworkInfo& frameworkInfo); + // Returns whether the principal is authorized to (re-)register an agent. + process::Future<bool> authorizeSlave(const Option<std::string>& principal); + // Returns whether the task is authorized. // Returns failure for transient authorization failures. process::Future<bool> authorizeTask( @@ -1623,14 +1648,16 @@ private: // failover. Slaves are removed from this collection when they // either re-register with the master or are marked unreachable // because they do not re-register before `recoveredTimer` fires. + // We must not answer questions related to these slaves (e.g., + // during task reconciliation) until we determine their fate + // because their are in this transitioning state. hashmap<SlaveID, SlaveInfo> recovered; - // Slaves that are in the process of registering. + // Agents that are in the process of (re-)registering. They are + // maintained here while the (re-)registration is in progress and + // possibly pending in the authorizer or the registrar in order + // to help deduplicate (re-)registration requests. hashset<process::UPID> registering; - - // Only those slaves that are re-registering for the first time - // with this master. We must not answer questions related to - // these slaves until the registrar determines their fate. hashset<SlaveID> reregistering; // Registered slaves are indexed by SlaveID and UPID. Note that http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index 1a0285a..a646768 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -47,6 +47,7 @@ #include "messages/messages.hpp" +#include "slave/constants.hpp" #include "slave/slave.hpp" #include "tests/containerizer.hpp" @@ -67,6 +68,7 @@ using mesos::master::detector::StandaloneMasterDetector; using process::Clock; using process::Future; +using process::Message; using process::Owned; using process::PID; using process::Promise; @@ -82,6 +84,7 @@ using testing::_; using testing::An; using testing::AtMost; using testing::DoAll; +using testing::Eq; using testing::Return; namespace mesos { @@ -2323,6 +2326,165 @@ TYPED_TEST(MasterAuthorizerTest, FilterOrphanedTasks) driver.join(); } + +TEST_F(MasterAuthorizationTest, AuthorizedToRegisterAndReregisterAgent) +{ + // Set up ACLs so that the agent can (re)register. + ACLs acls; + mesos::ACL::RegisterAgent* acl = acls.add_register_agents(); + acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal()); + acl->mutable_agent()->set_type(ACL::Entity::ANY); + + master::Flags masterFlags = CreateMasterFlags(); + masterFlags.acls = acls; + + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + + Future<Message> slaveRegisteredMessage = + FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Simulate a recovered agent and verify that it is allowed to reregister. + slave->reset(); + + Future<Message> slaveReregisteredMessage = + FUTURE_MESSAGE(Eq(SlaveReregisteredMessage().GetTypeName()), _, _); + + slave = StartSlave(detector.get(), slaveFlags); + + AWAIT_READY(slaveReregisteredMessage); +} + + +// This test verifies that the agent is shut down by the master if +// it is not authorized to register. +TEST_F(MasterAuthorizationTest, UnauthorizedToRegisterAgent) +{ + // Set up ACLs that disallows the agent's principal to register. + ACLs acls; + mesos::ACL::RegisterAgent* acl = acls.add_register_agents(); + acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal()); + acl->mutable_agent()->set_type(ACL::Entity::NONE); + + master::Flags flags = CreateMasterFlags(); + flags.acls = acls; + + Try<Owned<cluster::Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + Future<Message> shutdownMessage = + FUTURE_MESSAGE(Eq(ShutdownMessage ().GetTypeName()), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + AWAIT_READY(shutdownMessage); +} + + +// This test verifies that an agent authorized to register can be +// unauthorized to re-register due to master ACL change (after failover). +TEST_F(MasterAuthorizationTest, UnauthorizedToReregisterAgent) +{ + // Set up ACLs so that the agent can register. + ACLs acls; + mesos::ACL::RegisterAgent* acl = acls.add_register_agents(); + acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal()); + acl->mutable_agent()->set_type(ACL::Entity::ANY); + + master::Flags flags = CreateMasterFlags(); + flags.acls = acls; + + Try<Owned<cluster::Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + StandaloneMasterDetector detector(master.get()->pid); + + Future<Message> slaveRegisteredMessage = + FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(&detector); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // Master fails over. + master->reset(); + + // The new master doesn't allow this agent principal to re-register. + acl->mutable_agent()->set_type(ACL::Entity::NONE); + flags.acls = acls; + + Future<Message> shutdownMessage = + FUTURE_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _); + + master = StartMaster(flags); + ASSERT_SOME(master); + + detector.appoint(master.get()->pid); + + AWAIT_READY(shutdownMessage); +} + + +// This test verifies that duplicate agent registration attempts are +// ignored when the ongoing registration is pending in the authorizer. +TEST_F(MasterAuthorizationTest, RetryRegisterAgent) +{ + // Use a paused clock to control agent registration retries. + Clock::pause(); + + MockAuthorizer authorizer; + Try<Owned<cluster::Master>> master = StartMaster(&authorizer); + ASSERT_SOME(master); + + // Return a pending future from authorizer. + Future<Nothing> authorize; + Promise<bool> promise; + + // Expect the authorizer to be called only once, i.e., + // the retry is ignored. + EXPECT_CALL(authorizer, authorized(_)) + .WillOnce(DoAll(FutureSatisfy(&authorize), + Return(promise.future()))); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + // Trigger the first registration attempt (with authentication). + Clock::advance(slave::DEFAULT_REGISTRATION_BACKOFF_FACTOR); + + // Wait until the authorization is in progress. + AWAIT_READY(authorize); + + // Advance to trigger the second registration attempt. + Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX); + + // Settle to make sure the second registration attempt is received + // by the master. We can verify that it's ignored if the EXPECT_CALL + // above doesn't oversaturate. + Clock::settle(); + + Future<Message> slaveRegisteredMessage = + FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _); + + // Now authorize the agent and verify it's registered. + promise.set(true); + + AWAIT_READY(slaveRegisteredMessage); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
