Applied RegisterAgent ACL to the master.

Review: https://reviews.apache.org/r/57535


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29fc2dfc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29fc2dfc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29fc2dfc

Branch: refs/heads/master
Commit: 29fc2dfcb110a51923d4d7c144bdd797b348f96b
Parents: 38afa9c
Author: Jiang Yan Xu <[email protected]>
Authored: Fri Mar 10 15:25:43 2017 -0800
Committer: Jiang Yan Xu <[email protected]>
Committed: Fri Apr 28 14:55:08 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp                    | 247 +++++++++++++++++++++-----
 src/master/master.hpp                    |  39 +++-
 src/tests/master_authorization_tests.cpp | 162 +++++++++++++++++
 3 files changed, 394 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e8c2a96..2be4056 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3651,6 +3651,31 @@ Future<bool> Master::authorizeDestroyVolume(
 }
 
 
+Future<bool> Master::authorizeSlave(const Option<string>& principal)
+{
+  if (authorizer.isNone()) {
+    return true;
+  }
+
+  LOG(INFO) << "Authorizing agent "
+            << (principal.isSome()
+                ? "with principal '" + principal.get() + "'"
+                : "without a principal");
+
+  authorization::Request request;
+  request.set_action(authorization::REGISTER_AGENT);
+
+  if (principal.isSome()) {
+    request.mutable_subject()->set_value(principal.get());
+  }
+
+  // No need to set the request's object as it is implicitly set to
+  // ANY by the authorizer.
+
+  return authorizer.get()->authorized(request);
+}
+
+
 Resources Master::addTask(
     const TaskInfo& task,
     Framework* framework,
@@ -5388,26 +5413,91 @@ void Master::registerSlave(
     return;
   }
 
+  if (slaves.registering.contains(from)) {
+    LOG(INFO) << "Ignoring register agent message from " << from
+              << " (" << slaveInfo.hostname() << ") as registration"
+              << " is already in progress";
+    return;
+  }
+
+  slaves.registering.insert(from);
+
+  // Note that the principal may be empty if authentication is not
+  // required. Also it is passed along because it may be removed from
+  // `authenticated` while the authorization is pending.
+  Option<string> principal = authenticated.get(from);
+
+  authorizeSlave(principal)
+    .onAny(defer(self(),
+                 &Self::_registerSlave,
+                 slaveInfo,
+                 from,
+                 principal,
+                 checkpointedResources,
+                 version,
+                 agentCapabilities,
+                 lambda::_1));
+}
+
+
+void Master::_registerSlave(
+    const SlaveInfo& slaveInfo,
+    const UPID& pid,
+    const Option<string>& principal,
+    const vector<Resource>& checkpointedResources,
+    const string& version,
+    const vector<SlaveInfo::Capability>& agentCapabilities,
+    const Future<bool>& authorized)
+{
+  CHECK(!authorized.isDiscarded());
+  CHECK(slaves.registering.contains(pid));
+
+  Option<string> authorizationError = None();
+
+  if (authorized.isFailed()) {
+    authorizationError = "Authorization failure: " + authorized.failure();
+  } else if (!authorized.get()) {
+    authorizationError =
+      "Not authorized to register as agent " +
+      (principal.isSome()
+       ? "with principal '" + principal.get() + "'"
+       : "without a principal");
+  }
+
+  if (authorizationError.isSome()) {
+    LOG(WARNING) << "Refusing registration of agent at " << pid
+                 << ": " << authorizationError.get();
+
+    ShutdownMessage message;
+    message.set_message(authorizationError.get());
+    send(pid, message);
+
+    slaves.registering.erase(pid);
+    return;
+  }
+
   MachineID machineId;
   machineId.set_hostname(slaveInfo.hostname());
-  machineId.set_ip(stringify(from.address.ip));
+  machineId.set_ip(stringify(pid.address.ip));
 
   // Slaves are not allowed to register while the machine they are on is in
   // `DOWN` mode.
   if (machines.contains(machineId) &&
       machines[machineId].info.mode() == MachineInfo::DOWN) {
-    LOG(WARNING) << "Refusing registration of agent at " << from
+    LOG(WARNING) << "Refusing registration of agent at " << pid
                  << " because the machine '" << machineId << "' that it is "
                  << "running on is `DOWN`";
 
     ShutdownMessage message;
     message.set_message("Machine is `DOWN`");
-    send(from, message);
+    send(pid, message);
+
+    slaves.registering.erase(pid);
     return;
   }
 
   // Check if this slave is already registered (because it retries).
-  if (Slave* slave = slaves.registered.get(from)) {
+  if (Slave* slave = slaves.registered.get(pid)) {
     if (!slave->connected) {
       // The slave was previously disconnected but it is now trying
       // to register as a new slave. This could happen if the slave
@@ -5433,33 +5523,25 @@ void Master::registerSlave(
       SlaveRegisteredMessage message;
       message.mutable_slave_id()->CopyFrom(slave->id);
       message.mutable_connection()->CopyFrom(connection);
-      send(from, message);
+      send(pid, message);
+
+      slaves.registering.erase(pid);
       return;
     }
   }
 
-  // We need to generate a SlaveID and admit this slave only *once*.
-  if (slaves.registering.contains(from)) {
-    LOG(INFO) << "Ignoring register agent message from " << from
-              << " (" << slaveInfo.hostname() << ") as admission is"
-              << " already in progress";
-    return;
-  }
-
-  slaves.registering.insert(from);
-
   // Create and add the slave id.
   SlaveInfo slaveInfo_ = slaveInfo;
   slaveInfo_.mutable_id()->CopyFrom(newSlaveId());
 
-  LOG(INFO) << "Registering agent at " << from << " ("
+  LOG(INFO) << "Registering agent at " << pid << " ("
             << slaveInfo.hostname() << ") with id " << slaveInfo_.id();
 
   registrar->apply(Owned<Operation>(new AdmitSlave(slaveInfo_)))
     .onAny(defer(self(),
-                 &Self::_registerSlave,
+                 &Self::__registerSlave,
                  slaveInfo_,
-                 from,
+                 pid,
                  checkpointedResources,
                  version,
                  agentCapabilities,
@@ -5467,7 +5549,7 @@ void Master::registerSlave(
 }
 
 
-void Master::_registerSlave(
+void Master::__registerSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
     const vector<Resource>& checkpointedResources,
@@ -5476,7 +5558,6 @@ void Master::_registerSlave(
     const Future<bool>& admit)
 {
   CHECK(slaves.registering.contains(pid));
-  slaves.registering.erase(pid);
 
   CHECK(!admit.isDiscarded());
 
@@ -5495,6 +5576,8 @@ void Master::_registerSlave(
                  << " (" << slaveInfo.hostname() << ") was assigned"
                  << " an agent ID that already appears in the registry;"
                  << " ignoring registration attempt";
+
+    slaves.registering.erase(pid);
     return;
   }
 
@@ -5528,6 +5611,8 @@ void Master::_registerSlave(
 
   LOG(INFO) << "Registered agent " << *slave
             << " with " << slave->info.resources();
+
+  slaves.registering.erase(pid);
 }
 
 
@@ -5585,21 +5670,95 @@ void Master::reregisterSlave(
     return;
   }
 
+  if (slaves.reregistering.contains(slaveInfo.id())) {
+    LOG(INFO)
+      << "Ignoring re-register agent message from agent "
+      << slaveInfo.id() << " at " << from << " ("
+      << slaveInfo.hostname() << ") as re-registration is already in progress";
+    return;
+  }
+
+  slaves.reregistering.insert(slaveInfo.id());
+
+  // Note that the principal may be empty if authentication is not
+  // required. Also it is passed along because it may be removed from
+  // `authenticated` while the authorization is pending.
+  Option<string> principal = authenticated.get(from);
+
+  authorizeSlave(principal)
+    .onAny(defer(self(),
+                 &Self::_reregisterSlave,
+                 slaveInfo,
+                 from,
+                 principal,
+                 checkpointedResources,
+                 executorInfos,
+                 tasks,
+                 frameworks,
+                 completedFrameworks,
+                 version,
+                 agentCapabilities,
+                 lambda::_1));
+}
+
+
+void Master::_reregisterSlave(
+    const SlaveInfo& slaveInfo,
+    const UPID& pid,
+    const Option<string>& principal,
+    const vector<Resource>& checkpointedResources,
+    const vector<ExecutorInfo>& executorInfos,
+    const vector<Task>& tasks,
+    const vector<FrameworkInfo>& frameworks,
+    const vector<Archive::Framework>& completedFrameworks,
+    const string& version,
+    const vector<SlaveInfo::Capability>& agentCapabilities,
+    const Future<bool>& authorized)
+{
+  CHECK(!authorized.isDiscarded());
+  CHECK(slaves.reregistering.contains(slaveInfo.id()));
+
+  Option<string> authorizationError = None();
+
+  if (authorized.isFailed()) {
+    authorizationError = "Authorization failure: " + authorized.failure();
+  } else if (!authorized.get()) {
+    authorizationError =
+      "Not authorized to re-register as agent with principal " +
+      (principal.isSome()
+       ? "with principal '" + principal.get() + "'"
+       : "without a principal");
+  }
+
+  if (authorizationError.isSome()) {
+    LOG(WARNING) << "Refusing re-registration of agent at " << pid
+                 << ": " << authorizationError.get();
+
+    ShutdownMessage message;
+    message.set_message(authorizationError.get());
+    send(pid, message);
+
+    slaves.reregistering.erase(slaveInfo.id());
+    return;
+  }
+
   MachineID machineId;
   machineId.set_hostname(slaveInfo.hostname());
-  machineId.set_ip(stringify(from.address.ip));
+  machineId.set_ip(stringify(pid.address.ip));
 
-  // Slaves are not allowed to register while the machine they are on is in
+  // Slaves are not allowed to re-register while the machine they are on is in
   // 'DOWN` mode.
   if (machines.contains(machineId) &&
       machines[machineId].info.mode() == MachineInfo::DOWN) {
-    LOG(WARNING) << "Refusing re-registration of agent at " << from
+    LOG(WARNING) << "Refusing re-registration of agent at " << pid
                  << " because the machine '" << machineId << "' that it is "
                  << "running on is `DOWN`";
 
     ShutdownMessage message;
     message.set_message("Machine is `DOWN`");
-    send(from, message);
+    send(pid, message);
+
+    slaves.reregistering.erase(slaveInfo.id());
     return;
   }
 
@@ -5619,9 +5778,9 @@ void Master::reregisterSlave(
     // hostname. This is because maintenance is scheduled at the
     // machine level; so we would need to re-validate the slave's
     // unavailability if the machine it is running on changed.
-    if (slave->pid.address.ip != from.address.ip ||
+    if (slave->pid.address.ip != pid.address.ip ||
         slave->info.hostname() != slaveInfo.hostname()) {
-      LOG(WARNING) << "Agent " << slaveInfo.id() << " at " << from
+      LOG(WARNING) << "Agent " << slaveInfo.id() << " at " << pid
                    << " (" << slaveInfo.hostname() << ") attempted to "
                    << "re-register with different IP / hostname; expected "
                    << slave->pid.address.ip << " (" << slave->info.hostname()
@@ -5631,7 +5790,9 @@ void Master::reregisterSlave(
       message.set_message(
           "Agent attempted to re-register with different IP / hostname");
 
-      send(from, message);
+      send(pid, message);
+
+      slaves.reregistering.erase(slaveInfo.id());
       return;
     }
 
@@ -5641,7 +5802,7 @@ void Master::reregisterSlave(
     // in succession for a disconnected slave. As a result, we
     // ignore duplicate exited events for disconnected slaves.
     // See: https://issues.apache.org/jira/browse/MESOS-675
-    slave->pid = from;
+    slave->pid = pid;
     link(slave->pid);
 
     // Update slave's version, re-registration timestamp and
@@ -5676,26 +5837,15 @@ void Master::reregisterSlave(
 
     // Inform the agent of the master's version of its checkpointed
     // resources and the new framework pids for its tasks.
-    __reregisterSlave(slave, tasks, frameworks);
-
-    return;
-  }
+    ___reregisterSlave(slave, tasks, frameworks);
 
-  // If we're already re-registering this slave, then no need to ask
-  // the registrar again.
-  if (slaves.reregistering.contains(slaveInfo.id())) {
-    LOG(INFO)
-      << "Ignoring re-register agent message from agent "
-      << slaveInfo.id() << " at " << from << " ("
-      << slaveInfo.hostname() << ") as readmission is already in progress";
+    slaves.reregistering.erase(slaveInfo.id());
     return;
   }
 
-  LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << from
+  LOG(INFO) << "Re-registering agent " << slaveInfo.id() << " at " << pid
             << " (" << slaveInfo.hostname() << ")";
 
-  slaves.reregistering.insert(slaveInfo.id());
-
   // Consult the registry to determine whether to readmit the
   // slave. In the common case, the slave has been marked unreachable
   // by the master, so we move the slave to the reachable list and
@@ -5704,9 +5854,9 @@ void Master::reregisterSlave(
   // GC'd), we admit the slave anyway.
   registrar->apply(Owned<Operation>(new MarkSlaveReachable(slaveInfo)))
     .onAny(defer(self(),
-                 &Self::_reregisterSlave,
+                 &Self::__reregisterSlave,
                  slaveInfo,
-                 from,
+                 pid,
                  checkpointedResources,
                  executorInfos,
                  tasks,
@@ -5718,7 +5868,7 @@ void Master::reregisterSlave(
 }
 
 
-void Master::_reregisterSlave(
+void Master::__reregisterSlave(
     const SlaveInfo& slaveInfo,
     const UPID& pid,
     const vector<Resource>& checkpointedResources,
@@ -5731,7 +5881,6 @@ void Master::_reregisterSlave(
     const Future<bool>& readmit)
 {
   CHECK(slaves.reregistering.contains(slaveInfo.id()));
-  slaves.reregistering.erase(slaveInfo.id());
 
   if (readmit.isFailed()) {
     LOG(FATAL) << "Failed to readmit agent " << slaveInfo.id() << " at " << pid
@@ -5966,11 +6115,13 @@ void Master::_reregisterSlave(
     }
   }
 
-  __reregisterSlave(slave, tasks, frameworks);
+  ___reregisterSlave(slave, tasks, frameworks);
+
+  slaves.reregistering.erase(slaveInfo.id());
 }
 
 
-void Master::__reregisterSlave(
+void Master::___reregisterSlave(
     Slave* slave,
     const vector<Task>& tasks,
     const vector<FrameworkInfo>& frameworks)

http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d537933..eca353b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -514,6 +514,15 @@ protected:
   void _registerSlave(
       const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      const Option<std::string>& principal,
+      const std::vector<Resource>& checkpointedResources,
+      const std::string& version,
+      const std::vector<SlaveInfo::Capability>& agentCapabilities,
+      const process::Future<bool>& authorized);
+
+  void __registerSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
       const std::vector<Resource>& checkpointedResources,
       const std::string& version,
       const std::vector<SlaveInfo::Capability>& agentCapabilities,
@@ -522,6 +531,7 @@ protected:
   void _reregisterSlave(
       const SlaveInfo& slaveInfo,
       const process::UPID& pid,
+      const Option<std::string>& principal,
       const std::vector<Resource>& checkpointedResources,
       const std::vector<ExecutorInfo>& executorInfos,
       const std::vector<Task>& tasks,
@@ -529,9 +539,21 @@ protected:
       const std::vector<Archive::Framework>& completedFrameworks,
       const std::string& version,
       const std::vector<SlaveInfo::Capability>& agentCapabilities,
-      const process::Future<bool>& readmit);
+      const process::Future<bool>& authorized);
 
   void __reregisterSlave(
+      const SlaveInfo& slaveInfo,
+      const process::UPID& pid,
+      const std::vector<Resource>& checkpointedResources,
+      const std::vector<ExecutorInfo>& executorInfos,
+      const std::vector<Task>& tasks,
+      const std::vector<FrameworkInfo>& frameworks,
+      const std::vector<Archive::Framework>& completedFrameworks,
+      const std::string& version,
+      const std::vector<SlaveInfo::Capability>& agentCapabilities,
+      const process::Future<bool>& readmit);
+
+  void ___reregisterSlave(
       Slave* slave,
       const std::vector<Task>& tasks,
       const std::vector<FrameworkInfo>& frameworks);
@@ -659,6 +681,9 @@ protected:
   process::Future<bool> authorizeFramework(
       const FrameworkInfo& frameworkInfo);
 
+  // Returns whether the principal is authorized to (re-)register an agent.
+  process::Future<bool> authorizeSlave(const Option<std::string>& principal);
+
   // Returns whether the task is authorized.
   // Returns failure for transient authorization failures.
   process::Future<bool> authorizeTask(
@@ -1623,14 +1648,16 @@ private:
     // failover. Slaves are removed from this collection when they
     // either re-register with the master or are marked unreachable
     // because they do not re-register before `recoveredTimer` fires.
+    // We must not answer questions related to these slaves (e.g.,
+    // during task reconciliation) until we determine their fate
+    // because their are in this transitioning state.
     hashmap<SlaveID, SlaveInfo> recovered;
 
-    // Slaves that are in the process of registering.
+    // Agents that are in the process of (re-)registering. They are
+    // maintained here while the (re-)registration is in progress and
+    // possibly pending in the authorizer or the registrar in order
+    // to help deduplicate (re-)registration requests.
     hashset<process::UPID> registering;
-
-    // Only those slaves that are re-registering for the first time
-    // with this master. We must not answer questions related to
-    // these slaves until the registrar determines their fate.
     hashset<SlaveID> reregistering;
 
     // Registered slaves are indexed by SlaveID and UPID. Note that

http://git-wip-us.apache.org/repos/asf/mesos/blob/29fc2dfc/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index 1a0285a..a646768 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -47,6 +47,7 @@
 
 #include "messages/messages.hpp"
 
+#include "slave/constants.hpp"
 #include "slave/slave.hpp"
 
 #include "tests/containerizer.hpp"
@@ -67,6 +68,7 @@ using mesos::master::detector::StandaloneMasterDetector;
 
 using process::Clock;
 using process::Future;
+using process::Message;
 using process::Owned;
 using process::PID;
 using process::Promise;
@@ -82,6 +84,7 @@ using testing::_;
 using testing::An;
 using testing::AtMost;
 using testing::DoAll;
+using testing::Eq;
 using testing::Return;
 
 namespace mesos {
@@ -2323,6 +2326,165 @@ TYPED_TEST(MasterAuthorizerTest, FilterOrphanedTasks)
   driver.join();
 }
 
+
+TEST_F(MasterAuthorizationTest, AuthorizedToRegisterAndReregisterAgent)
+{
+  // Set up ACLs so that the agent can (re)register.
+  ACLs acls;
+  mesos::ACL::RegisterAgent* acl = acls.add_register_agents();
+  acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+  acl->mutable_agent()->set_type(ACL::Entity::ANY);
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.acls = acls;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<Message> slaveRegisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Simulate a recovered agent and verify that it is allowed to reregister.
+  slave->reset();
+
+  Future<Message> slaveReregisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveReregisteredMessage().GetTypeName()), _, _);
+
+  slave = StartSlave(detector.get(), slaveFlags);
+
+  AWAIT_READY(slaveReregisteredMessage);
+}
+
+
+// This test verifies that the agent is shut down by the master if
+// it is not authorized to register.
+TEST_F(MasterAuthorizationTest, UnauthorizedToRegisterAgent)
+{
+  // Set up ACLs that disallows the agent's principal to register.
+  ACLs acls;
+  mesos::ACL::RegisterAgent* acl = acls.add_register_agents();
+  acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+  acl->mutable_agent()->set_type(ACL::Entity::NONE);
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = acls;
+
+  Try<Owned<cluster::Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Future<Message> shutdownMessage  =
+    FUTURE_MESSAGE(Eq(ShutdownMessage ().GetTypeName()), _, _);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// This test verifies that an agent authorized to register can be
+// unauthorized to re-register due to master ACL change (after failover).
+TEST_F(MasterAuthorizationTest, UnauthorizedToReregisterAgent)
+{
+  // Set up ACLs so that the agent can register.
+  ACLs acls;
+  mesos::ACL::RegisterAgent* acl = acls.add_register_agents();
+  acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+  acl->mutable_agent()->set_type(ACL::Entity::ANY);
+
+  master::Flags flags = CreateMasterFlags();
+  flags.acls = acls;
+
+  Try<Owned<cluster::Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Future<Message> slaveRegisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Master fails over.
+  master->reset();
+
+  // The new master doesn't allow this agent principal to re-register.
+  acl->mutable_agent()->set_type(ACL::Entity::NONE);
+  flags.acls = acls;
+
+  Future<Message> shutdownMessage =
+    FUTURE_MESSAGE(Eq(ShutdownMessage().GetTypeName()), _, _);
+
+  master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  detector.appoint(master.get()->pid);
+
+  AWAIT_READY(shutdownMessage);
+}
+
+
+// This test verifies that duplicate agent registration attempts are
+// ignored when the ongoing registration is pending in the authorizer.
+TEST_F(MasterAuthorizationTest, RetryRegisterAgent)
+{
+  // Use a paused clock to control agent registration retries.
+  Clock::pause();
+
+  MockAuthorizer authorizer;
+  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> authorize;
+  Promise<bool> promise;
+
+  // Expect the authorizer to be called only once, i.e.,
+  // the retry is ignored.
+  EXPECT_CALL(authorizer, authorized(_))
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
+                    Return(promise.future())));
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  // Trigger the first registration attempt (with authentication).
+  Clock::advance(slave::DEFAULT_REGISTRATION_BACKOFF_FACTOR);
+
+  // Wait until the authorization is in progress.
+  AWAIT_READY(authorize);
+
+  // Advance to trigger the second registration attempt.
+  Clock::advance(slave::REGISTER_RETRY_INTERVAL_MAX);
+
+  // Settle to make sure the second registration attempt is received
+  // by the master. We can verify that it's ignored if the EXPECT_CALL
+  // above doesn't oversaturate.
+  Clock::settle();
+
+  Future<Message> slaveRegisteredMessage =
+    FUTURE_MESSAGE(Eq(SlaveRegisteredMessage().GetTypeName()), _, _);
+
+  // Now authorize the agent and verify it's registered.
+  promise.set(true);
+
+  AWAIT_READY(slaveRegisteredMessage);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to