This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 7f89fccb6f958b04467d5179656f001be96bf30b Author: Joseph Wu <[email protected]> AuthorDate: Fri Jun 21 08:05:00 2019 -0700 Added a registry operation for the DRAIN_AGENT call. This adds the associated registry operation and fields for the DRAIN_AGENT master call. This call affects admitted or unreachable agents, but this commit only deals with admitted agents. Because this feature is not downgrade compatible, a minimum capability is added when the draining feature is used. Review: https://reviews.apache.org/r/70923 --- src/master/registry.proto | 9 ++++++ src/master/registry_operations.cpp | 48 +++++++++++++++++++++++++++++ src/master/registry_operations.hpp | 20 ++++++++++++ src/tests/registrar_tests.cpp | 63 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 140 insertions(+) diff --git a/src/master/registry.proto b/src/master/registry.proto index 67904ed..435b9e1 100644 --- a/src/master/registry.proto +++ b/src/master/registry.proto @@ -41,6 +41,15 @@ message Registry { message Slave { required SlaveInfo info = 1; + + // If set, this agent is marked for draining and should be sent the + // appropriate DrainSlaveMessage upon reregistration. + optional DrainInfo drain_info = 2; + + // If true, this agent should not be included in any offers, + // but should otherwise operate normally. + // If the `DrainInfo` is set, this value must also be set to `true`. + optional bool deactivated = 3 [default = false]; } message Slaves { diff --git a/src/master/registry_operations.cpp b/src/master/registry_operations.cpp index c417c4d..00dca41 100644 --- a/src/master/registry_operations.cpp +++ b/src/master/registry_operations.cpp @@ -18,6 +18,7 @@ #include "master/registry_operations.hpp" +#include "common/protobuf_utils.hpp" #include "common/resources_utils.hpp" namespace mesos { @@ -347,6 +348,53 @@ Try<bool> MarkSlaveGone::perform(Registry* registry, hashset<SlaveID>* slaveIDs) return Error("Failed to find agent " + stringify(id)); } + +DrainAgent::DrainAgent( + const SlaveID& _slaveId, + const Option<DurationInfo>& _maxGracePeriod, + const bool _markGone) + : slaveId(_slaveId), + maxGracePeriod(_maxGracePeriod), + markGone(_markGone) +{} + + +Try<bool> DrainAgent::perform(Registry* registry, hashset<SlaveID>* slaveIDs) +{ + // Check whether the slave is in the admitted list. + bool found = false; + if (slaveIDs->contains(slaveId)) { + found = true; + + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + if (registry->slaves().slaves(i).info().id() == slaveId) { + Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i); + + slave->mutable_drain_info()->set_state(DRAINING); + + // Copy the DrainConfig and ensure the agent is deactivated. + if (maxGracePeriod.isSome()) { + slave->mutable_drain_info()->mutable_config() + ->mutable_max_grace_period()->CopyFrom(maxGracePeriod.get()); + } + + slave->mutable_drain_info()->mutable_config()->set_mark_gone(markGone); + slave->set_deactivated(true); + break; + } + } + } + + // Make sure the AGENT_DRAINING minimum capability is present or added. + if (found) { + protobuf::master::addMinimumCapability( + registry->mutable_minimum_capabilities(), + MasterInfo::Capability::AGENT_DRAINING); + } + + return found; // Mutation if found. +} + } // namespace master { } // namespace internal { } // namespace mesos { diff --git a/src/master/registry_operations.hpp b/src/master/registry_operations.hpp index 5a3010d..4f75735 100644 --- a/src/master/registry_operations.hpp +++ b/src/master/registry_operations.hpp @@ -137,6 +137,26 @@ private: const TimeInfo goneTime; }; + +// Marks an existing agent for draining. +// Also adds a minimum capability to the master for AGENT_DRAINING. +class DrainAgent : public RegistryOperation +{ +public: + DrainAgent( + const SlaveID& _slaveId, + const Option<DurationInfo>& _maxGracePeriod, + const bool _markGone); + +protected: + Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override; + +private: + const SlaveID slaveId; + const Option<DurationInfo> maxGracePeriod; + const bool markGone; +}; + } // namespace master { } // namespace internal { } // namespace mesos { diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index 5f0de52..b4f2e25 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -911,6 +911,69 @@ TEST_F(RegistrarTest, StopMaintenance) } +// Marks an agent for draining and checks for the appropriate data. +TEST_F(RegistrarTest, DrainAgent) +{ + SlaveID notAdmittedID; + notAdmittedID.set_value("not-admitted"); + + { + // Prepare the registrar. + Registrar registrar(flags, state); + AWAIT_READY(registrar.recover(master)); + + // Add an agent to be marked by other operations. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new AdmitSlave(slave)))); + + // Try to mark an unknown agent for draining. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DrainAgent(notAdmittedID, None(), false)))); + } + + { + // Check that the agent is admitted, but has no DrainConfig. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(1, registry->slaves().slaves().size()); + EXPECT_EQ(slave, registry->slaves().slaves(0).info()); + EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info()); + EXPECT_FALSE(registry->slaves().slaves(0).deactivated()); + + // No minimum capability should be added when the operation does + // not mutate anything. + EXPECT_EQ(0, registry->minimum_capabilities().size()); + + // Drain an admitted agent. + AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>( + new DrainAgent(slave.id(), None(), true)))); + } + + { + // Check that agent is now marked for draining. + Registrar registrar(flags, state); + Future<Registry> registry = registrar.recover(master); + AWAIT_READY(registry); + + ASSERT_EQ(1, registry->slaves().slaves().size()); + ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info()); + EXPECT_EQ(DRAINING, registry->slaves().slaves(0).drain_info().state()); + EXPECT_FALSE(registry->slaves().slaves(0) + .drain_info().config().has_max_grace_period()); + EXPECT_TRUE(registry->slaves().slaves(0).drain_info().config().mark_gone()); + EXPECT_TRUE(registry->slaves().slaves(0).deactivated()); + + // Minimum capability should be added now. + ASSERT_EQ(1, registry->minimum_capabilities().size()); + EXPECT_EQ( + MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING), + registry->minimum_capabilities(0).capability()); + } +} + + // Tests that adding and updating quotas in the registry works properly. TEST_F(RegistrarTest, UpdateQuota) {
