This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b920f60bcee913319fb78427f28f6f5789729478 Author: Andrei Sekretenko <[email protected]> AuthorDate: Tue Jan 21 21:58:23 2020 +0100 Store per-framework ObjectApprovers. This is a prerequisite to synchronous authorization of scheduler API calls (see MESOS-10056). Review: https://reviews.apache.org/r/72096 --- src/master/framework.cpp | 65 ++++++++++++-- src/master/master.cpp | 141 ++++++++++++++++++++----------- src/master/master.hpp | 41 +++++++-- src/tests/master_authorization_tests.cpp | 89 +++++++++++-------- src/tests/master_load_tests.cpp | 23 ++++- 5 files changed, 256 insertions(+), 103 deletions(-) diff --git a/src/master/framework.cpp b/src/master/framework.cpp index 85d9951..ffcf367 100644 --- a/src/master/framework.cpp +++ b/src/master/framework.cpp @@ -19,6 +19,13 @@ #include "common/heartbeater.hpp" #include "common/protobuf_utils.hpp" +using process::Failure; +using process::Future; +using process::Owned; +using process::http::authentication::Principal; + +using mesos::authorization::ActionObject; + namespace mesos { namespace internal { namespace master { @@ -28,8 +35,9 @@ Framework::Framework( const Flags& masterFlags, const FrameworkInfo& info, const process::UPID& pid, + const Owned<ObjectApprovers>& approvers, const process::Time& time) - : Framework(master, masterFlags, info, CONNECTED, true, time) + : Framework(master, masterFlags, info, CONNECTED, true, approvers, time) { pid_ = pid; } @@ -40,8 +48,9 @@ Framework::Framework( const Flags& masterFlags, const FrameworkInfo& info, const StreamingHttpConnection<v1::scheduler::Event>& http, + const Owned<ObjectApprovers>& approvers, const process::Time& time) - : Framework(master, masterFlags, info, CONNECTED, true, time) + : Framework(master, masterFlags, info, CONNECTED, true, approvers, time) { http_ = http; } @@ -51,7 +60,8 @@ Framework::Framework( Master* const master, const Flags& masterFlags, const FrameworkInfo& info) - : Framework(master, masterFlags, info, RECOVERED, false, process::Time()) + : Framework( + master, masterFlags, info, RECOVERED, false, nullptr, process::Time()) {} @@ -61,6 +71,7 @@ Framework::Framework( const FrameworkInfo& _info, State state, bool active_, + const Owned<ObjectApprovers>& approvers, const process::Time& time) : master(_master), info(_info), @@ -72,7 +83,8 @@ Framework::Framework( unreachableTasks(masterFlags.max_unreachable_tasks_per_framework), metrics(_info, masterFlags.publish_per_framework_metrics), active_(active_), - state(state) + state(state), + objectApprovers(approvers) { CHECK(_info.has_id()); @@ -559,7 +571,9 @@ void Framework::update(const FrameworkInfo& newInfo) } -void Framework::updateConnection(const process::UPID& newPid) +void Framework::updateConnection( + const process::UPID& newPid, + const Owned<ObjectApprovers>& objectApprovers_) { // Cleanup the old connection state if exists. disconnect(); @@ -567,12 +581,14 @@ void Framework::updateConnection(const process::UPID& newPid) // TODO(benh): unlink(oldPid); pid_ = newPid; + objectApprovers = objectApprovers_; setState(State::CONNECTED); } void Framework::updateConnection( - const StreamingHttpConnection<v1::scheduler::Event>& newHttp) + const StreamingHttpConnection<v1::scheduler::Event>& newHttp, + const Owned<ObjectApprovers>& objectApprovers_) { // Note that master creates a new HTTP connection for every // subscribe request, so 'newHttp' should always be different @@ -587,6 +603,7 @@ void Framework::updateConnection( CHECK_NONE(http_); http_ = newHttp; + objectApprovers = objectApprovers_; setState(State::CONNECTED); } @@ -620,6 +637,12 @@ bool Framework::disconnect() http_ = None(); heartbeater.reset(); + + // `ObjectApprover`s are kept up-to-date by authorizer, which potentially + // entails continious interaction with an external IAM. Hence, we do not + // want to keep them alive if there is no subscribed scheduler. + objectApprovers.reset(); + setState(State::DISCONNECTED); return true; } @@ -703,6 +726,36 @@ void Framework::setState(Framework::State _state) metrics.subscribed = state == Framework::State::CONNECTED ? 1 : 0; } + +Try<bool> Framework::approved(const ActionObject& actionObject) const +{ + CHECK(objectApprovers.get() != nullptr) + << "Framework " << *this << " has no ObjectApprovers" + << " (attempt to call approved() for a disconnected framework?)"; + + return objectApprovers->approved( + actionObject.action(), + actionObject.object().getOrElse(authorization::Object())); +} + + +constexpr std::initializer_list<authorization::Action> SCHEDULER_API_ACTIONS{ + authorization::REGISTER_FRAMEWORK}; + + +Future<Owned<ObjectApprovers>> Framework::createObjectApprovers( + const Option<Authorizer*>& authorizer, + const FrameworkInfo& frameworkInfo) +{ + return ObjectApprovers::create( + authorizer, + frameworkInfo.has_principal() + ? Option<Principal>(frameworkInfo.principal()) + : Option<Principal>::none(), + SCHEDULER_API_ACTIONS); +} + + } // namespace master { } // namespace internal { } // namespace mesos { diff --git a/src/master/master.cpp b/src/master/master.cpp index 3c621e4..9d1e541 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2629,6 +2629,39 @@ Option<Error> Master::validateFramework( } +// Returns None if the framework object approvers are ready and the scheduler +// trying to SUBSCRIBE is authorized to do so with provided framework info. +// Otherwise, returns an error to be sent to the scheduler trying to subscribe. +static Option<Error> checkSubscribeAuthorization( + const Future<Owned<ObjectApprovers>>& frameworkObjectApprovers, + const FrameworkInfo& frameworkInfo) +{ + if (frameworkObjectApprovers.isFailed()) { + return Error( + "Authorization failure: could not create ObjectApprovers for a " + "framework: " + + frameworkObjectApprovers.failure()); + } + + auto actionObject = ActionObject::frameworkRegistration(frameworkInfo); + + CHECK(frameworkObjectApprovers.isReady()); + Try<bool> approved = frameworkObjectApprovers.get()->approved( + actionObject.action(), + actionObject.object().getOrElse(authorization::Object())); + + if (approved.isError()) { + return Error("Authorization failure: " + approved.error()); + } + + if (!*approved) { + return Error("Not authorized to " + stringify(actionObject)); + } + + return None(); +}; + + void Master::subscribe( StreamingHttpConnection<v1::scheduler::Event> http, scheduler::Call::Subscribe&& subscribe) @@ -2669,11 +2702,12 @@ void Master::subscribe( FrameworkInfo&&, bool, google::protobuf::RepeatedPtrField<string>&&, - const Future<bool>&) = &Self::_subscribe; + const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe; - Future<bool> authorized = authorizeFramework(frameworkInfo); + Future<Owned<ObjectApprovers>> objectApprovers = + Framework::createObjectApprovers(authorizer, frameworkInfo); - authorized.onAny( + objectApprovers.onAny( defer(self(), _subscribe, http, @@ -2689,21 +2723,12 @@ void Master::_subscribe( FrameworkInfo&& frameworkInfo, bool force, google::protobuf::RepeatedPtrField<string>&& suppressedRolesField, - const Future<bool>& authorized) + const Future<Owned<ObjectApprovers>>& objectApprovers) { - CHECK(!authorized.isDiscarded()); - - Option<Error> authorizationError = None(); - - if (authorized.isFailed()) { - authorizationError = - Error("Authorization failure: " + authorized.failure()); - } else if (!authorized.get()) { - authorizationError = Error( - "Not authorized to use roles '" + - stringify(protobuf::framework::getRoles(frameworkInfo)) + "'"); - } + CHECK(!objectApprovers.isDiscarded()); + Option<Error> authorizationError = + checkSubscribeAuthorization(objectApprovers, frameworkInfo); if (authorizationError.isSome()) { LOG(INFO) << "Refusing subscription of framework" << " '" << frameworkInfo.name() << "'" @@ -2716,6 +2741,8 @@ void Master::_subscribe( return; } + CHECK(objectApprovers.isReady()); + LOG(INFO) << "Subscribing framework '" << frameworkInfo.name() << "' with checkpointing " << (frameworkInfo.checkpoint() ? "enabled" : "disabled") @@ -2731,7 +2758,8 @@ void Master::_subscribe( FrameworkInfo frameworkInfo_ = frameworkInfo; frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId()); - Framework* framework = new Framework(this, flags, frameworkInfo_, http); + Framework* framework = + new Framework(this, flags, frameworkInfo_, http, objectApprovers.get()); addFramework(framework, suppressedRoles); @@ -2798,11 +2826,16 @@ void Master::_subscribe( framework->reregisteredTime = Clock::now(); // Always failover the old framework connection. See MESOS-4712 for details. - failoverFramework(framework, http); + failoverFramework(framework, http, objectApprovers.get()); } else { // The framework has not yet reregistered after master failover. connectAndActivateRecoveredFramework( - framework, frameworkInfo, None(), http, suppressedRoles); + framework, + frameworkInfo, + None(), + http, + objectApprovers.get(), + suppressedRoles); } sendFrameworkUpdates(*framework); @@ -2906,18 +2939,19 @@ void Master::subscribe( FrameworkInfo&&, bool, google::protobuf::RepeatedPtrField<string>&&, - const Future<bool>&) = &Self::_subscribe; + const Future<Owned<ObjectApprovers>>&) = &Self::_subscribe; - Future<bool> authorized = authorizeFramework(frameworkInfo); + Future<Owned<ObjectApprovers>> objectApprovers = + Framework::createObjectApprovers(authorizer, frameworkInfo); - authorized.onAny( - defer(self(), - _subscribe, - from, - std::move(frameworkInfo), - subscribe.force(), - std::move(*subscribe.mutable_suppressed_roles()), - lambda::_1)); + objectApprovers.onAny(defer( + self(), + _subscribe, + from, + std::move(frameworkInfo), + subscribe.force(), + std::move(*subscribe.mutable_suppressed_roles()), + lambda::_1)); } @@ -2926,20 +2960,12 @@ void Master::_subscribe( FrameworkInfo&& frameworkInfo, bool force, google::protobuf::RepeatedPtrField<string>&& suppressedRolesField, - const Future<bool>& authorized) + const Future<Owned<ObjectApprovers>>& objectApprovers) { - CHECK(!authorized.isDiscarded()); + CHECK(!objectApprovers.isDiscarded()); - Option<Error> authorizationError = None(); - - if (authorized.isFailed()) { - authorizationError = - Error("Authorization failure: " + authorized.failure()); - } else if (!authorized.get()) { - authorizationError = Error( - "Not authorized to use roles '" + - stringify(protobuf::framework::getRoles(frameworkInfo)) + "'"); - } + Option<Error> authorizationError = + checkSubscribeAuthorization(objectApprovers, frameworkInfo); if (authorizationError.isSome()) { LOG(INFO) << "Refusing subscription of framework" @@ -2953,6 +2979,8 @@ void Master::_subscribe( return; } + CHECK(objectApprovers.isReady()); + // At this point, authentications errors will be due to // re-authentication during the authorization process, // so we drop the subscription. @@ -2997,7 +3025,8 @@ void Master::_subscribe( // Assign a new FrameworkID. frameworkInfo.mutable_id()->CopyFrom(newFrameworkId()); - Framework* framework = new Framework(this, flags, frameworkInfo, from); + Framework* framework = + new Framework(this, flags, frameworkInfo, from, objectApprovers.get()); addFramework(framework, suppressedRoles); @@ -3099,7 +3128,7 @@ void Master::_subscribe( // FrameworkReregisteredMessage back and activate the framework // if necesssary. LOG(INFO) << "Framework " << *framework << " failed over"; - failoverFramework(framework, from); + failoverFramework(framework, from, objectApprovers.get()); } else { LOG(INFO) << "Allowing framework " << *framework << " to subscribe with an already used id"; @@ -3130,7 +3159,7 @@ void Master::_subscribe( // framework link previously broke. link(framework->pid().get()); - framework->updateConnection(*(framework->pid())); + framework->updateConnection(*(framework->pid()), objectApprovers.get()); if (framework->activate()) { // The framework was not active and needs to be activated in allocator. // @@ -3147,7 +3176,12 @@ void Master::_subscribe( } else { // The framework has not yet reregistered after master failover. connectAndActivateRecoveredFramework( - framework, frameworkInfo, from, None(), suppressedRoles); + framework, + frameworkInfo, + from, + None(), + objectApprovers.get(), + suppressedRoles); } sendFrameworkUpdates(*framework); @@ -10563,6 +10597,7 @@ void Master::connectAndActivateRecoveredFramework( const FrameworkInfo& frameworkInfo, const Option<UPID>& pid, const Option<StreamingHttpConnection<v1::scheduler::Event>>& http, + const Owned<ObjectApprovers>& objectApprovers, const set<string>& suppressedRoles) { // Exactly one of `pid` or `http` must be provided. @@ -10587,10 +10622,10 @@ void Master::connectAndActivateRecoveredFramework( // Update the framework's connection state. if (pid.isSome()) { - framework->updateConnection(pid.get()); + framework->updateConnection(pid.get(), objectApprovers); link(pid.get()); } else { - framework->updateConnection(http.get()); + framework->updateConnection(http.get(), objectApprovers); http->closed() .onAny(defer(self(), &Self::exited, framework->id(), http.get())); } @@ -10641,7 +10676,8 @@ void Master::connectAndActivateRecoveredFramework( void Master::failoverFramework( Framework* framework, - const StreamingHttpConnection<v1::scheduler::Event>& http) + const StreamingHttpConnection<v1::scheduler::Event>& http, + const Owned<ObjectApprovers>& objectApprovers) { CHECK_NOTNULL(framework); @@ -10665,7 +10701,7 @@ void Master::failoverFramework( frameworks.principals.erase(framework->pid().get()); } - framework->updateConnection(http); + framework->updateConnection(http, objectApprovers); http.closed() .onAny(defer(self(), &Self::exited, framework->id(), http)); @@ -10679,7 +10715,10 @@ void Master::failoverFramework( // Replace the scheduler for a framework with a new process ID, in the // event of a scheduler failover. -void Master::failoverFramework(Framework* framework, const UPID& newPid) +void Master::failoverFramework( + Framework* framework, + const UPID& newPid, + const Owned<ObjectApprovers>& objectApprovers) { CHECK_NOTNULL(framework); @@ -10701,7 +10740,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid) framework->send(message); } - framework->updateConnection(newPid); + framework->updateConnection(newPid, objectApprovers); link(newPid); _failoverFramework(framework); diff --git a/src/master/master.hpp b/src/master/master.hpp index f3239cd..e0ef1cd 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -640,17 +640,22 @@ protected: const FrameworkInfo& frameworkInfo, const Option<process::UPID>& pid, const Option<StreamingHttpConnection<v1::scheduler::Event>>& http, + const process::Owned<ObjectApprovers>& objectApprovers, const std::set<std::string>& suppressedRoles); // Replace the scheduler for a framework with a new process ID, in // the event of a scheduler failover. - void failoverFramework(Framework* framework, const process::UPID& newPid); + void failoverFramework( + Framework* framework, + const process::UPID& newPid, + const process::Owned<ObjectApprovers>& objectApprovers); // Replace the scheduler for a framework with a new HTTP connection, // in the event of a scheduler failover. void failoverFramework( Framework* framework, - const StreamingHttpConnection<v1::scheduler::Event>& http); + const StreamingHttpConnection<v1::scheduler::Event>& http, + const process::Owned<ObjectApprovers>& objectApprovers); void _failoverFramework(Framework* framework); @@ -919,7 +924,7 @@ private: FrameworkInfo&& frameworkInfo, bool force, google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles, - const process::Future<bool>& authorized); + const process::Future<process::Owned<ObjectApprovers>>& objectApprovers); void subscribe( const process::UPID& from, @@ -930,7 +935,7 @@ private: FrameworkInfo&& frameworkInfo, bool force, google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles, - const process::Future<bool>& authorized); + const process::Future<process::Owned<ObjectApprovers>>& objectApprovers); // Update framework via SchedulerDriver (i.e. no response // code feedback, FrameworkErrorMessage on error). @@ -2441,12 +2446,14 @@ struct Framework const Flags& masterFlags, const FrameworkInfo& info, const process::UPID& _pid, + const process::Owned<ObjectApprovers>& objectApprovers, const process::Time& time = process::Clock::now()); Framework(Master* const master, const Flags& masterFlags, const FrameworkInfo& info, const StreamingHttpConnection<v1::scheduler::Event>& _http, + const process::Owned<ObjectApprovers>& objectApprovers, const process::Time& time = process::Clock::now()); Framework(Master* const master, @@ -2515,14 +2522,18 @@ struct Framework // Reactivate framework with new connection: update connection-related state // and mark the framework as CONNECTED, regardless of the previous state. - void updateConnection(const process::UPID& newPid); void updateConnection( - const StreamingHttpConnection<v1::scheduler::Event>& newHttp); + const process::UPID& newPid, + const process::Owned<ObjectApprovers>& objectApprovers); + + void updateConnection( + const StreamingHttpConnection<v1::scheduler::Event>& newHttp, + const process::Owned<ObjectApprovers>& objectApprovers); // If the framework is CONNECTED, clear all state associated with // the scheduler being connected (close http connection, stop heartbeater, - // etc.), mark the framework DISCONNECTED and return `true`. - // Otherwise, return `false`. + // clear object approvers, etc.), mark the framework DISCONNECTED and return + // `true`. Otherwise, return `false`. bool disconnect(); // Mark the framework as active (eligible to receive offers if connected) @@ -2548,6 +2559,16 @@ struct Framework const Option<process::UPID>& pid() const { return pid_; } + // Returns ObjectApprovers for all actions + // needed to authorize scheduler API calls. + static process::Future<process::Owned<ObjectApprovers>> createObjectApprovers( + const Option<Authorizer*>& _authorizer, + const FrameworkInfo& frameworkInfo); + + // Returns whether the framework principal is authorized to perform + // action on object. + Try<bool> approved(const authorization::ActionObject& actionObject) const; + Master* const master; FrameworkInfo info; @@ -2640,6 +2661,7 @@ private: const FrameworkInfo& _info, State state, bool active, + const process::Owned<ObjectApprovers>& objectApprovers, const process::Time& time); Framework(const Framework&); // No copying. @@ -2667,6 +2689,9 @@ private: // This is only set for HTTP frameworks. process::Owned<ResponseHeartbeater<scheduler::Event, v1::scheduler::Event>> heartbeater; + + // ObjectApprovers for the framework's principal. + process::Owned<ObjectApprovers> objectApprovers; }; diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index bc8155b..4074a18 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -15,6 +15,7 @@ // limitations under the License. #include <atomic> +#include <memory> #include <set> #include <string> #include <utility> @@ -68,6 +69,8 @@ namespace http = process::http; +using std::shared_ptr; + using google::protobuf::RepeatedPtrField; using mesos::internal::master::Master; @@ -102,6 +105,7 @@ using testing::An; using testing::AtMost; using testing::DoAll; using testing::Eq; +using testing::Ne; using testing::Invoke; using testing::Return; using testing::Truly; @@ -1084,6 +1088,10 @@ TEST_F(MasterAuthorizationTest, UnauthorizedRole) driver.join(); } +static shared_ptr<const ObjectApprover> getAcceptingObjectApprover() +{ + return std::make_shared<AcceptingObjectApprover>(); +} // This test verifies that an authentication request that comes from // the same instance of the framework (e.g., ZK blip) before @@ -1108,15 +1116,18 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration) // Return pending futures from authorizer. Future<Nothing> authorize1; - Promise<bool> promise1; + Promise<shared_ptr<const ObjectApprover>> promise1; Future<Nothing> authorize2; - Promise<bool> promise2; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize1), - Return(promise1.future()))) - .WillOnce(DoAll(FutureSatisfy(&authorize2), - Return(promise2.future()))) - .WillRepeatedly(Return(true)); // Authorize subsequent registration retries. + Promise<shared_ptr<const ObjectApprover>> promise2; + + // Expect requests for two approvers for REGISTER_FRAMEWORK. + EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK)) + .WillOnce(DoAll(FutureSatisfy(&authorize1), Return(promise1.future()))) + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))); + + // Handle requests for all other approvers. + EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK))) + .WillRepeatedly(Return(getAcceptingObjectApprover())); // Pause the clock to avoid registration retries. Clock::pause(); @@ -1133,7 +1144,7 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration) AWAIT_READY(authorize2); // Now complete the first authorization attempt. - promise1.set(true); + promise1.set(getAcceptingObjectApprover()); // First registration request should succeed because the // framework PID did not change. @@ -1143,7 +1154,7 @@ TEST_F(MasterAuthorizationTest, DuplicateRegistration) FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _); // Now complete the second authorization attempt. - promise2.set(true); + promise2.set(getAcceptingObjectApprover()); // Master should acknowledge the second registration attempt too. AWAIT_READY(frameworkRegisteredMessage); @@ -1176,16 +1187,16 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) // Return pending futures from authorizer after the first attempt. Future<Nothing> authorize2; - Promise<bool> promise2; + Promise<shared_ptr<const ObjectApprover>> promise2; Future<Nothing> authorize3; - Promise<bool> promise3; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(Return(true)) - .WillOnce(DoAll(FutureSatisfy(&authorize2), - Return(promise2.future()))) - .WillOnce(DoAll(FutureSatisfy(&authorize3), - Return(promise3.future()))) - .WillRepeatedly(Return(true)); // Authorize subsequent registration retries. + Promise<shared_ptr<const ObjectApprover>> promise3; + EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK)) + .WillOnce(Return(getAcceptingObjectApprover())) + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))) + .WillOnce(DoAll(FutureSatisfy(&authorize3), Return(promise3.future()))); + + EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK))) + .WillRepeatedly(Return(getAcceptingObjectApprover())); // Pause the clock to avoid re-registration retries. Clock::pause(); @@ -1214,7 +1225,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) .WillOnce(FutureSatisfy(&reregistered)); // Now complete the second authorization attempt. - promise2.set(true); + promise2.set(getAcceptingObjectApprover()); // First re-registration request should succeed because the // framework PID did not change. @@ -1224,7 +1235,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) FUTURE_PROTOBUF(FrameworkReregisteredMessage(), _, _); // Now complete the third authorization attempt. - promise3.set(true); + promise3.set(getAcceptingObjectApprover()); // Master should acknowledge the second re-registration attempt too. AWAIT_READY(frameworkReregisteredMessage); @@ -1235,7 +1246,7 @@ TEST_F(MasterAuthorizationTest, DuplicateReregistration) // This test ensures that a framework that is removed while -// authorization for registration is in progress is properly handled. +// obtaining ObjectApprovers during registration is properly handled. TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration) { MockAuthorizer authorizer; @@ -1248,11 +1259,13 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration) // Return a pending future from authorizer. Future<Nothing> authorize; - Promise<bool> promise; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(DoAll(FutureSatisfy(&authorize), - Return(promise.future()))) - .WillRepeatedly(Return(true)); // Authorize subsequent registration retries. + Promise<shared_ptr<const ObjectApprover>> promise; + EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK)) + .WillRepeatedly(DoAll(FutureSatisfy(&authorize), Return(promise.future()))); + + EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK))) + .WillRepeatedly(Return(getAcceptingObjectApprover())); + // Pause the clock to avoid scheduler registration retries. Clock::pause(); @@ -1276,8 +1289,8 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeRegistration) Future<Nothing> removeFramework = FUTURE_DISPATCH(_, &MesosAllocatorProcess::removeFramework); - // Now complete authorization. - promise.set(true); + // Now make all the returned approvers ready. + promise.set(getAcceptingObjectApprover()); // When the master tries to link to a non-existent framework PID // it should realize the framework is gone and remove it. @@ -1305,13 +1318,17 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration) EXPECT_CALL(sched, registered(&driver, _, _)) .WillOnce(FutureSatisfy(®istered)); - // Return a pending future from authorizer after first attempt. + // Return a pending future from authorizer after first request for + // REGISTER_FRAMEWORK approver Future<Nothing> authorize2; - Promise<bool> promise2; - EXPECT_CALL(authorizer, authorized(_)) - .WillOnce(Return(true)) - .WillOnce(DoAll(FutureSatisfy(&authorize2), - Return(promise2.future()))); + Promise<shared_ptr<const ObjectApprover>> promise2; + EXPECT_CALL(authorizer, getApprover(_, authorization::REGISTER_FRAMEWORK)) + .WillOnce(Return(getAcceptingObjectApprover())) + .WillOnce(DoAll(FutureSatisfy(&authorize2), Return(promise2.future()))); + + // Handle all other actions. + EXPECT_CALL(authorizer, getApprover(_, Ne(authorization::REGISTER_FRAMEWORK))) + .WillRepeatedly(Return(getAcceptingObjectApprover())); // Pause the clock to avoid scheduler registration retries. Clock::pause(); @@ -1344,7 +1361,7 @@ TEST_F(MasterAuthorizationTest, FrameworkRemovedBeforeReregistration) AWAIT_READY(removeFramework); // Now complete the second authorization attempt. - promise2.set(true); + promise2.set(getAcceptingObjectApprover()); // Master should drop the second framework re-registration request // because the framework PID was removed from 'authenticated' map. diff --git a/src/tests/master_load_tests.cpp b/src/tests/master_load_tests.cpp index c7c1b12..f6656ff 100644 --- a/src/tests/master_load_tests.cpp +++ b/src/tests/master_load_tests.cpp @@ -122,7 +122,7 @@ public: BlockingAuthorizerProcess(Authorizer* underlying) : ProcessBase(process::ID::generate("blocking-authorizer")), underlying_(underlying), - blocked_(true) {} + blocked_(false) {} Future<bool> authorized(const authorization::Request& request) { @@ -151,7 +151,14 @@ public: return promises_.size(); } - // Satisfies all future and prior calls made to `getApprover`. + Future<Nothing> block() + { + blocked_ = true; + + return Nothing(); + } + + // Satisfies all future and prending calls made to `getApprover`. Future<Nothing> unleash() { CHECK_EQ(promises_.size(), futures_.size()); @@ -217,6 +224,13 @@ public: &BlockingAuthorizerProcess::pending); } + Future<Nothing> block() + { + return process::dispatch( + process_.get(), + &BlockingAuthorizerProcess::block); + } + Future<Nothing> unleash() { return process::dispatch( @@ -266,6 +280,11 @@ void MasterLoadTest::prepareCluster(Authorizer* authorizer) slave_ = slave.get(); AWAIT_READY(slaveRegisteredMessage); + + // NOTE: Authorizer is blocked after preparing the cluster, otherwise + // framework registration, which also uses `prepareObjectApprover(...) will + // be blocked too. + authorizer_->block(); }
