This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d8e07f5f74ed911ce9be7c9f463875bcf2e2a9d9
Author: Joseph Wu <[email protected]>
AuthorDate: Fri Jun 21 08:58:50 2019 -0700

    Added registry operations for DE/RE-ACTIVATE_AGENT calls.
    
    This adds the associated registry operation and fields for the
    DEACTIVATE_AGENT and REACTIVATE_AGENT master calls.
    
    Like the DRAIN_AGENT call, the deactivation state also persists when
    agents become unreachable/reachable, which is already handled by the
    DRAIN_AGENT operation implementation.
    
    Likewise, this feature is not downgrade compatible, and a minimum
    capability is added when the deactivation feature is used.
    If all draining or deactivated agents are reactivated, the minimum
    capability is removed.
    
    Review: https://reviews.apache.org/r/70924
---
 src/master/registry_operations.cpp | 130 ++++++++++++++++
 src/master/registry_operations.hpp |  31 ++++
 src/tests/registrar_tests.cpp      | 312 +++++++++++++++++++++++++++++++++++++
 3 files changed, 473 insertions(+)

diff --git a/src/master/registry_operations.cpp 
b/src/master/registry_operations.cpp
index c59939b..65c1f1c 100644
--- a/src/master/registry_operations.cpp
+++ b/src/master/registry_operations.cpp
@@ -488,6 +488,136 @@ Try<bool> MarkAgentDrained::perform(
   return found; // Mutation if found.
 }
 
+
+DeactivateAgent::DeactivateAgent(
+    const SlaveID& _slaveId)
+  : slaveId(_slaveId)
+{}
+
+
+Try<bool> DeactivateAgent::perform(
+    Registry* registry, hashset<SlaveID>* slaveIDs)
+{
+  // Check whether the slave is in the admitted list.
+  bool found = false;
+  if (slaveIDs->contains(slaveId)) {
+    found = true;
+
+    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+      if (registry->slaves().slaves(i).info().id() == slaveId) {
+        Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i);
+
+        // Set the deactivated boolean.
+        slave->set_deactivated(true);
+        break;
+      }
+    }
+  }
+
+  // If not found above, check the unreachable list.
+  if (!found) {
+    for (int i = 0; i < registry->unreachable().slaves().size(); i++) {
+      if (registry->unreachable().slaves(i).id() == slaveId) {
+        Registry::UnreachableSlave* slave =
+          registry->mutable_unreachable()->mutable_slaves(i);
+
+        // Set the deactivated boolean.
+        slave->set_deactivated(true);
+        found = true;
+        break;
+      }
+    }
+  }
+
+  // Make sure the AGENT_DRAINING minimum capability is present or added.
+  if (found) {
+    protobuf::master::addMinimumCapability(
+        registry->mutable_minimum_capabilities(),
+        MasterInfo::Capability::AGENT_DRAINING);
+  }
+
+  return found; // Mutation if found.
+}
+
+
+ReactivateAgent::ReactivateAgent(
+    const SlaveID& _slaveId)
+  : slaveId(_slaveId)
+{}
+
+
+Try<bool> ReactivateAgent::perform(
+    Registry* registry, hashset<SlaveID>* slaveIDs)
+{
+  // Check whether the slave is in the admitted list.
+  bool found = false;
+  bool moreThanOneDeactivated = false;
+
+  for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+    if (registry->slaves().slaves(i).info().id() == slaveId) {
+      Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i);
+
+      // Clear the draining and deactivated states.
+      slave->clear_drain_info();
+      slave->clear_deactivated();
+      found = true;
+
+      if (moreThanOneDeactivated) {
+        break;
+      }
+
+      continue;
+    }
+
+    if (registry->slaves().slaves(i).deactivated()) {
+      moreThanOneDeactivated = true;
+
+      if (found) {
+        break;
+      }
+    }
+  }
+
+  // Check the unreachable list too.
+  if (!found || !moreThanOneDeactivated) {
+    for (int i = 0; i < registry->unreachable().slaves().size(); i++) {
+      if (registry->unreachable().slaves(i).id() == slaveId) {
+        Registry::UnreachableSlave* slave =
+          registry->mutable_unreachable()->mutable_slaves(i);
+
+        // Clear the draining and deactivated states.
+        slave->clear_drain_info();
+        slave->clear_deactivated();
+        found = true;
+
+        if (moreThanOneDeactivated) {
+          break;
+        }
+
+        continue;
+      }
+
+      if (registry->unreachable().slaves(i).deactivated()) {
+        moreThanOneDeactivated = true;
+
+        if (found) {
+          break;
+        }
+      }
+    }
+  }
+
+  // If this is the last deactivated agent,
+  // remove the AGENT_DRAINING minimum capability.
+  if (found && !moreThanOneDeactivated) {
+    protobuf::master::removeMinimumCapability(
+        registry->mutable_minimum_capabilities(),
+        MasterInfo::Capability::AGENT_DRAINING);
+  }
+
+  return found; // Mutation if found.
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/master/registry_operations.hpp 
b/src/master/registry_operations.hpp
index c7b072b..e8229aa 100644
--- a/src/master/registry_operations.hpp
+++ b/src/master/registry_operations.hpp
@@ -171,6 +171,37 @@ private:
   const SlaveID slaveId;
 };
 
+
+// Marks an existing agent as deactivated.
+// Also adds a minimum capability to the master for AGENT_DRAINING.
+class DeactivateAgent : public RegistryOperation
+{
+public:
+  DeactivateAgent(const SlaveID& _slaveId);
+
+protected:
+  Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override;
+
+private:
+  const SlaveID slaveId;
+};
+
+
+// Clears draining or deactivation from an existing agent.
+// If there are no remaining draining or deactivated agents,
+// this also clears the minimum capability for AGENT_DRAINING.
+class ReactivateAgent : public RegistryOperation
+{
+public:
+  ReactivateAgent(const SlaveID& _slaveId);
+
+protected:
+  Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override;
+
+private:
+  const SlaveID slaveId;
+};
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index d317a93..ff7f933 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -1099,6 +1099,318 @@ TEST_F(RegistrarTest, MarkAgentDrained)
 }
 
 
+TEST_F(RegistrarTest, DeactivateAgent)
+{
+  SlaveID notAdmittedID;
+  notAdmittedID.set_value("not-admitted");
+
+  {
+    // Prepare the registrar.
+    Registrar registrar(flags, state);
+    AWAIT_READY(registrar.recover(master));
+
+    // Add an agent to be marked by other operations.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(slave))));
+
+    // Try to mark an unknown agent for draining.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(notAdmittedID))));
+  }
+
+  {
+    // Check that the agent is admitted and is not deactivated.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ(slave, registry->slaves().slaves(0).info());
+    EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
+
+    // No minimum capability should be added when the operation does
+    // not mutate anything.
+    EXPECT_EQ(0, registry->minimum_capabilities().size());
+
+    // Deactivate the admitted agent this time.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(slave.id()))));
+  }
+
+  {
+    // Check that agent is now deactivated.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
+
+    // Minimum capability should be added now.
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+
+    // Mark the agent unreachable.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
+  }
+
+  {
+    // Check that unreachable agent retains the deactivation.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    EXPECT_EQ(0, registry->slaves().slaves().size());
+    ASSERT_EQ(1, registry->unreachable().slaves().size());
+    EXPECT_FALSE(registry->unreachable().slaves(0).has_drain_info());
+    EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
+
+    // Mark the agent reachable.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkSlaveReachable(slave))));
+  }
+
+  {
+    // Check that reachable agent retains the deactivation.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ(0, registry->unreachable().slaves().size());
+    EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
+  }
+}
+
+
+// Checks that reactivating agents will remove the draining/deactivated
+// metadata and the AGENT_DRAINING minimum capability correctly.
+TEST_F(RegistrarTest, ReactivateAgent)
+{
+  SlaveID reachable1;
+  reachable1.set_value("reachable1");
+
+  SlaveID reachable2;
+  reachable2.set_value("reachable2");
+
+  SlaveID unreachable1;
+  unreachable1.set_value("unreachable1");
+
+  SlaveID unreachable2;
+  unreachable2.set_value("unreachable2");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(reachable1);
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost");
+  info2.mutable_id()->CopyFrom(reachable2);
+
+  SlaveInfo info3;
+  info3.set_hostname("localhost");
+  info3.mutable_id()->CopyFrom(unreachable1);
+
+  SlaveInfo info4;
+  info4.set_hostname("localhost");
+  info4.mutable_id()->CopyFrom(unreachable2);
+
+  {
+    // Prepare the registrar.
+    Registrar registrar(flags, state);
+    AWAIT_READY(registrar.recover(master));
+
+    // Add all the agents.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(info1))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(info2))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(info3))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(info4))));
+
+    // Mark two agents as unreachable.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkSlaveUnreachable(info3, protobuf::getCurrentTime()))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkSlaveUnreachable(info4, protobuf::getCurrentTime()))));
+
+    // Two reachable deactivated agents.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(reachable1))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(reachable2))));
+  }
+
+  {
+    // Check for two deactivated agents and the minimum capability.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
+    EXPECT_TRUE(registry->slaves().slaves(1).deactivated());
+
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+
+    // Reactivate one agent.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(reachable1))));
+  }
+
+  {
+    // Check for one deactivated agent and
+    // that the minimum capability is still present.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
+    EXPECT_TRUE(registry->slaves().slaves(1).deactivated());
+
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+
+    // Reactivate the other agent.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(reachable2))));
+  }
+
+  {
+    // Check for one deactivated agent and
+    // that the minimum capability is still present.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
+    EXPECT_FALSE(registry->slaves().slaves(1).deactivated());
+
+    ASSERT_EQ(0, registry->minimum_capabilities().size());
+
+    // Two unreachable deactivated agents.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(unreachable1))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(unreachable2))));
+
+    // Try reactivating an agent that is already active.
+    // This should not result in a change.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(reachable1))));
+  }
+
+  {
+    // Again, check for two deactivated agents and the minimum capability.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
+    EXPECT_TRUE(registry->unreachable().slaves(1).deactivated());
+
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+
+    // Reactivate one. This time, in the opposite order.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(unreachable2))));
+  }
+
+  {
+    // Should be one deactivated agent, with minimum capability still present.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
+    EXPECT_FALSE(registry->unreachable().slaves(1).deactivated());
+
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+
+    // Reactivate the other one.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(unreachable1))));
+  }
+
+  {
+    // No deactivated agents, with no minimum capability.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_FALSE(registry->unreachable().slaves(0).deactivated());
+    EXPECT_FALSE(registry->unreachable().slaves(1).deactivated());
+
+    ASSERT_EQ(0, registry->minimum_capabilities().size());
+
+    // Now try deactivate reachable and unreachable together.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(reachable1))));
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DeactivateAgent(unreachable1))));
+
+    // We'll skip a validation step here and reactivate one right away.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new ReactivateAgent(reachable1))));
+  }
+
+  {
+    // Minimum capability should not be removed if an unreachable agent
+    // is deactivated.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(2, registry->slaves().slaves().size());
+    ASSERT_EQ(2, registry->unreachable().slaves().size());
+
+    EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
+    EXPECT_TRUE(registry->unreachable().slaves(0).deactivated());
+
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+  }
+}
+
+
 // Tests that adding and updating quotas in the registry works properly.
 TEST_F(RegistrarTest, UpdateQuota)
 {

Reply via email to