This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d8e07f5f74ed911ce9be7c9f463875bcf2e2a9d9 Author: Joseph Wu <[email protected]> AuthorDate: Fri Jun 21 08:58:50 2019 -0700 Added registry operations for DE/RE-ACTIVATE_AGENT calls. This adds the associated registry operation and fields for the DEACTIVATE_AGENT and REACTIVATE_AGENT master calls. Like the DRAIN_AGENT call, the deactivation state also persists when agents become unreachable/reachable, which is already handled by the DRAIN_AGENT operation implementation. Likewise, this feature is not downgrade compatible, and a minimum capability is added when the deactivation feature is used. If all draining or deactivated agents are reactivated, the minimum capability is removed. Review: https://reviews.apache.org/r/70924 --- src/master/registry_operations.cpp | 130 ++++++++++++++++ src/master/registry_operations.hpp | 31 ++++ src/tests/registrar_tests.cpp | 312 +++++++++++++++++++++++++++++++++++++ 3 files changed, 473 insertions(+) diff --git a/src/master/registry_operations.cpp b/src/master/registry_operations.cpp index c59939b..65c1f1c 100644 --- a/src/master/registry_operations.cpp +++ b/src/master/registry_operations.cpp @@ -488,6 +488,136 @@ Try<bool> MarkAgentDrained::perform( return found; // Mutation if found. } + +DeactivateAgent::DeactivateAgent( + const SlaveID& _slaveId) + : slaveId(_slaveId) +{} + + +Try<bool> DeactivateAgent::perform( + Registry* registry, hashset<SlaveID>* slaveIDs) +{ + // Check whether the slave is in the admitted list. + bool found = false; + if (slaveIDs->contains(slaveId)) { + found = true; + + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + if (registry->slaves().slaves(i).info().id() == slaveId) { + Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i); + + // Set the deactivated boolean. + slave->set_deactivated(true); + break; + } + } + } + + // If not found above, check the unreachable list. + if (!found) { + for (int i = 0; i < registry->unreachable().slaves().size(); i++) { + if (registry->unreachable().slaves(i).id() == slaveId) { + Registry::UnreachableSlave* slave = + registry->mutable_unreachable()->mutable_slaves(i); + + // Set the deactivated boolean. + slave->set_deactivated(true); + found = true; + break; + } + } + } + + // Make sure the AGENT_DRAINING minimum capability is present or added. + if (found) { + protobuf::master::addMinimumCapability( + registry->mutable_minimum_capabilities(), + MasterInfo::Capability::AGENT_DRAINING); + } + + return found; // Mutation if found. +} + + +ReactivateAgent::ReactivateAgent( + const SlaveID& _slaveId) + : slaveId(_slaveId) +{} + + +Try<bool> ReactivateAgent::perform( + Registry* registry, hashset<SlaveID>* slaveIDs) +{ + // Check whether the slave is in the admitted list. + bool found = false; + bool moreThanOneDeactivated = false; + + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + if (registry->slaves().slaves(i).info().id() == slaveId) { + Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i); + + // Clear the draining and deactivated states. + slave->clear_drain_info(); + slave->clear_deactivated(); + found = true; + + if (moreThanOneDeactivated) { + break; + } + + continue; + } + + if (registry->slaves().slaves(i).deactivated()) { + moreThanOneDeactivated = true; + + if (found) { + break; + } + } + } + + // Check the unreachable list too. + if (!found || !moreThanOneDeactivated) { + for (int i = 0; i < registry->unreachable().slaves().size(); i++) { + if (registry->unreachable().slaves(i).id() == slaveId) { + Registry::UnreachableSlave* slave = + registry->mutable_unreachable()->mutable_slaves(i); + + // Clear the draining and deactivated states. + slave->clear_drain_info(); + slave->clear_deactivated(); + found = true; + + if (moreThanOneDeactivated) { + break; + } + + continue; + } + + if (registry->unreachable().slaves(i).deactivated()) { + moreThanOneDeactivated = true; + + if (found) { + break; + } + } + } + } + + // If this is the last deactivated agent, + // remove the AGENT_DRAINING minimum capability. + if (found && !moreThanOneDeactivated) { + protobuf::master::removeMinimumCapability( + registry->mutable_minimum_capabilities(), + MasterInfo::Capability::AGENT_DRAINING); + } + + return found; // Mutation if found. +} + } // namespace master { } // namespace internal { } // namespace mesos { diff --git a/src/master/registry_operations.hpp b/src/master/registry_operations.hpp index c7b072b..e8229aa 100644 --- a/src/master/registry_operations.hpp +++ b/src/master/registry_operations.hpp @@ -171,6 +171,37 @@ private: const SlaveID slaveId; }; + +// Marks an existing agent as deactivated. +// Also adds a minimum capability to the master for AGENT_DRAINING. +class DeactivateAgent : public RegistryOperation +{ +public: + DeactivateAgent(const SlaveID& _slaveId); + +protected: + Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override; + +private: + const SlaveID slaveId; +}; + + +// Clears draining or deactivation from an existing agent. +// If there are no remaining draining or deactivated agents, +// this also clears the minimum capability for AGENT_DRAINING. +class ReactivateAgent : public RegistryOperation +{ +public: + ReactivateAgent(const SlaveID& _slaveId); + +protected: + Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override; + +private: + const SlaveID slaveId; +}; + } // namespace master { } // namespace internal { } // namespace mesos { diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index d317a93..ff7f933 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -1099,6 +1099,318 @@ TEST_F(RegistrarTest, MarkAgentDrained) } +TEST_F(RegistrarTest, DeactivateAgent) +{ + SlaveID notAdmittedID; + notAdmittedID.set_value("not-admitted"); + + { + // Prepare the registrar. + Registrar registrar(flags, state); + AWAIT_READY(registrar.recover(master)); + + // Add an agent to be marked by other operations. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(slave)))); + + // Try to mark an unknown agent for draining. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(notAdmittedID)))); + } + + { + // Check that the agent is admitted and is not deactivated. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(1, registry->slaves().slaves().size()); + EXPECT_EQ(slave, registry->slaves().slaves(0).info()); + EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info()); + EXPECT_FALSE(registry->slaves().slaves(0).deactivated()); + + // No minimum capability should be added when the operation does + // not mutate anything. + EXPECT_EQ(0, registry->minimum_capabilities().size()); + + // Deactivate the admitted agent this time. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(slave.id())))); + } + + { + // Check that agent is now deactivated. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(1, registry->slaves().slaves().size()); + EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info()); + EXPECT_TRUE(registry->slaves().slaves(0).deactivated()); + + // Minimum capability should be added now. + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + + // Mark the agent unreachable. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new MarkSlaveUnreachable(slave, protobuf::getCurrentTime())))); + } + + { + // Check that unreachable agent retains the deactivation. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + EXPECT_EQ(0, registry->slaves().slaves().size()); + ASSERT_EQ(1, registry->unreachable().slaves().size()); + EXPECT_FALSE(registry->unreachable().slaves(0).has_drain_info()); + EXPECT_TRUE(registry->unreachable().slaves(0).deactivated()); + + // Mark the agent reachable. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new MarkSlaveReachable(slave)))); + } + + { + // Check that reachable agent retains the deactivation. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(1, registry->slaves().slaves().size()); + EXPECT_EQ(0, registry->unreachable().slaves().size()); + EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info()); + EXPECT_TRUE(registry->slaves().slaves(0).deactivated()); + } +} + + +// Checks that reactivating agents will remove the draining/deactivated +// metadata and the AGENT_DRAINING minimum capability correctly. +TEST_F(RegistrarTest, ReactivateAgent) +{ + SlaveID reachable1; + reachable1.set_value("reachable1"); + + SlaveID reachable2; + reachable2.set_value("reachable2"); + + SlaveID unreachable1; + unreachable1.set_value("unreachable1"); + + SlaveID unreachable2; + unreachable2.set_value("unreachable2"); + + SlaveInfo info1; + info1.set_hostname("localhost"); + info1.mutable_id()->CopyFrom(reachable1); + + SlaveInfo info2; + info2.set_hostname("localhost"); + info2.mutable_id()->CopyFrom(reachable2); + + SlaveInfo info3; + info3.set_hostname("localhost"); + info3.mutable_id()->CopyFrom(unreachable1); + + SlaveInfo info4; + info4.set_hostname("localhost"); + info4.mutable_id()->CopyFrom(unreachable2); + + { + // Prepare the registrar. + Registrar registrar(flags, state); + AWAIT_READY(registrar.recover(master)); + + // Add all the agents. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(info1)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(info2)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(info3)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(info4)))); + + // Mark two agents as unreachable. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new MarkSlaveUnreachable(info3, protobuf::getCurrentTime())))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new MarkSlaveUnreachable(info4, protobuf::getCurrentTime())))); + + // Two reachable deactivated agents. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(reachable1)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(reachable2)))); + } + + { + // Check for two deactivated agents and the minimum capability. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_TRUE(registry->slaves().slaves(0).deactivated()); + EXPECT_TRUE(registry->slaves().slaves(1).deactivated()); + + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + + // Reactivate one agent. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(reachable1)))); + } + + { + // Check for one deactivated agent and + // that the minimum capability is still present. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_FALSE(registry->slaves().slaves(0).deactivated()); + EXPECT_TRUE(registry->slaves().slaves(1).deactivated()); + + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + + // Reactivate the other agent. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(reachable2)))); + } + + { + // Check for one deactivated agent and + // that the minimum capability is still present. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_FALSE(registry->slaves().slaves(0).deactivated()); + EXPECT_FALSE(registry->slaves().slaves(1).deactivated()); + + ASSERT_EQ(0, registry->minimum_capabilities().size()); + + // Two unreachable deactivated agents. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(unreachable1)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(unreachable2)))); + + // Try reactivating an agent that is already active. + // This should not result in a change. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(reachable1)))); + } + + { + // Again, check for two deactivated agents and the minimum capability. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_TRUE(registry->unreachable().slaves(0).deactivated()); + EXPECT_TRUE(registry->unreachable().slaves(1).deactivated()); + + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + + // Reactivate one. This time, in the opposite order. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(unreachable2)))); + } + + { + // Should be one deactivated agent, with minimum capability still present. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_TRUE(registry->unreachable().slaves(0).deactivated()); + EXPECT_FALSE(registry->unreachable().slaves(1).deactivated()); + + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + + // Reactivate the other one. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(unreachable1)))); + } + + { + // No deactivated agents, with no minimum capability. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_FALSE(registry->unreachable().slaves(0).deactivated()); + EXPECT_FALSE(registry->unreachable().slaves(1).deactivated()); + + ASSERT_EQ(0, registry->minimum_capabilities().size()); + + // Now try deactivate reachable and unreachable together. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(reachable1)))); + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DeactivateAgent(unreachable1)))); + + // We'll skip a validation step here and reactivate one right away. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new ReactivateAgent(reachable1)))); + } + + { + // Minimum capability should not be removed if an unreachable agent + // is deactivated. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(2, registry->slaves().slaves().size()); + ASSERT_EQ(2, registry->unreachable().slaves().size()); + + EXPECT_FALSE(registry->slaves().slaves(0).deactivated()); + EXPECT_TRUE(registry->unreachable().slaves(0).deactivated()); + + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + } +} + + // Tests that adding and updating quotas in the registry works properly. TEST_F(RegistrarTest, UpdateQuota) {
