This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7f89fccb6f958b04467d5179656f001be96bf30b
Author: Joseph Wu <[email protected]>
AuthorDate: Fri Jun 21 08:05:00 2019 -0700

    Added a registry operation for the DRAIN_AGENT call.
    
    This adds the associated registry operation and fields for the
    DRAIN_AGENT master call.  This call affects admitted or unreachable
    agents, but this commit only deals with admitted agents.
    
    Because this feature is not downgrade compatible, a minimum capability
    is added when the draining feature is used.
    
    Review: https://reviews.apache.org/r/70923
---
 src/master/registry.proto          |  9 ++++++
 src/master/registry_operations.cpp | 48 +++++++++++++++++++++++++++++
 src/master/registry_operations.hpp | 20 ++++++++++++
 src/tests/registrar_tests.cpp      | 63 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 140 insertions(+)

diff --git a/src/master/registry.proto b/src/master/registry.proto
index 67904ed..435b9e1 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -41,6 +41,15 @@ message Registry {
 
   message Slave {
     required SlaveInfo info = 1;
+
+    // If set, this agent is marked for draining and should be sent the
+    // appropriate DrainSlaveMessage upon reregistration.
+    optional DrainInfo drain_info = 2;
+
+    // If true, this agent should not be included in any offers,
+    // but should otherwise operate normally.
+    // If the `DrainInfo` is set, this value must also be set to `true`.
+    optional bool deactivated = 3 [default = false];
   }
 
   message Slaves {
diff --git a/src/master/registry_operations.cpp 
b/src/master/registry_operations.cpp
index c417c4d..00dca41 100644
--- a/src/master/registry_operations.cpp
+++ b/src/master/registry_operations.cpp
@@ -18,6 +18,7 @@
 
 #include "master/registry_operations.hpp"
 
+#include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
 namespace mesos {
@@ -347,6 +348,53 @@ Try<bool> MarkSlaveGone::perform(Registry* registry, 
hashset<SlaveID>* slaveIDs)
   return Error("Failed to find agent " + stringify(id));
 }
 
+
+DrainAgent::DrainAgent(
+    const SlaveID& _slaveId,
+    const Option<DurationInfo>& _maxGracePeriod,
+    const bool _markGone)
+  : slaveId(_slaveId),
+    maxGracePeriod(_maxGracePeriod),
+    markGone(_markGone)
+{}
+
+
+Try<bool> DrainAgent::perform(Registry* registry, hashset<SlaveID>* slaveIDs)
+{
+  // Check whether the slave is in the admitted list.
+  bool found = false;
+  if (slaveIDs->contains(slaveId)) {
+    found = true;
+
+    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+      if (registry->slaves().slaves(i).info().id() == slaveId) {
+        Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i);
+
+        slave->mutable_drain_info()->set_state(DRAINING);
+
+        // Copy the DrainConfig and ensure the agent is deactivated.
+        if (maxGracePeriod.isSome()) {
+          slave->mutable_drain_info()->mutable_config()
+            ->mutable_max_grace_period()->CopyFrom(maxGracePeriod.get());
+        }
+
+        slave->mutable_drain_info()->mutable_config()->set_mark_gone(markGone);
+        slave->set_deactivated(true);
+        break;
+      }
+    }
+  }
+
+  // Make sure the AGENT_DRAINING minimum capability is present or added.
+  if (found) {
+    protobuf::master::addMinimumCapability(
+        registry->mutable_minimum_capabilities(),
+        MasterInfo::Capability::AGENT_DRAINING);
+  }
+
+  return found; // Mutation if found.
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/master/registry_operations.hpp 
b/src/master/registry_operations.hpp
index 5a3010d..4f75735 100644
--- a/src/master/registry_operations.hpp
+++ b/src/master/registry_operations.hpp
@@ -137,6 +137,26 @@ private:
   const TimeInfo goneTime;
 };
 
+
+// Marks an existing agent for draining.
+// Also adds a minimum capability to the master for AGENT_DRAINING.
+class DrainAgent : public RegistryOperation
+{
+public:
+  DrainAgent(
+      const SlaveID& _slaveId,
+      const Option<DurationInfo>& _maxGracePeriod,
+      const bool _markGone);
+
+protected:
+  Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override;
+
+private:
+  const SlaveID slaveId;
+  const Option<DurationInfo> maxGracePeriod;
+  const bool markGone;
+};
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 5f0de52..b4f2e25 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -911,6 +911,69 @@ TEST_F(RegistrarTest, StopMaintenance)
 }
 
 
+// Marks an agent for draining and checks for the appropriate data.
+TEST_F(RegistrarTest, DrainAgent)
+{
+  SlaveID notAdmittedID;
+  notAdmittedID.set_value("not-admitted");
+
+  {
+    // Prepare the registrar.
+    Registrar registrar(flags, state);
+    AWAIT_READY(registrar.recover(master));
+
+    // Add an agent to be marked by other operations.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(slave))));
+
+    // Try to mark an unknown agent for draining.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DrainAgent(notAdmittedID, None(), false))));
+  }
+
+  {
+    // Check that the agent is admitted, but has no DrainConfig.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_EQ(slave, registry->slaves().slaves(0).info());
+    EXPECT_FALSE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_FALSE(registry->slaves().slaves(0).deactivated());
+
+    // No minimum capability should be added when the operation does
+    // not mutate anything.
+    EXPECT_EQ(0, registry->minimum_capabilities().size());
+
+    // Drain an admitted agent.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DrainAgent(slave.id(), None(), true))));
+  }
+
+  {
+    // Check that agent is now marked for draining.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_EQ(DRAINING, registry->slaves().slaves(0).drain_info().state());
+    EXPECT_FALSE(registry->slaves().slaves(0)
+      .drain_info().config().has_max_grace_period());
+    
EXPECT_TRUE(registry->slaves().slaves(0).drain_info().config().mark_gone());
+    EXPECT_TRUE(registry->slaves().slaves(0).deactivated());
+
+    // Minimum capability should be added now.
+    ASSERT_EQ(1, registry->minimum_capabilities().size());
+    EXPECT_EQ(
+        
MasterInfo_Capability_Type_Name(MasterInfo::Capability::AGENT_DRAINING),
+        registry->minimum_capabilities(0).capability());
+  }
+}
+
+
 // Tests that adding and updating quotas in the registry works properly.
 TEST_F(RegistrarTest, UpdateQuota)
 {

Reply via email to