This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit fb654756464517e3c15fd014f91dd1d3b6c33793
Author: Joseph Wu <[email protected]>
AuthorDate: Wed Jun 26 08:08:36 2019 -0700

    Added registry operation for marking an agent as drained.
    
    This adds an operation purely for transitioning an agent from the
    DRAINING state to the DRAINED state.  The master is expected to
    validate that the targetted agent is DRAINING, otherwise, the
    write will fail (which results in the master aborting).
    
    Review: https://reviews.apache.org/r/70957
---
 src/master/registry_operations.cpp | 54 ++++++++++++++++++++++++
 src/master/registry_operations.hpp | 14 +++++++
 src/tests/registrar_tests.cpp      | 84 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 152 insertions(+)

diff --git a/src/master/registry_operations.cpp 
b/src/master/registry_operations.cpp
index c286f6c..c59939b 100644
--- a/src/master/registry_operations.cpp
+++ b/src/master/registry_operations.cpp
@@ -434,6 +434,60 @@ Try<bool> DrainAgent::perform(Registry* registry, 
hashset<SlaveID>* slaveIDs)
   return found; // Mutation if found.
 }
 
+
+MarkAgentDrained::MarkAgentDrained(
+    const SlaveID& _slaveId)
+  : slaveId(_slaveId)
+{}
+
+
+Try<bool> MarkAgentDrained::perform(
+    Registry* registry, hashset<SlaveID>* slaveIDs)
+{
+  // Check whether the slave is in the admitted list.
+  bool found = false;
+  if (slaveIDs->contains(slaveId)) {
+    found = true;
+
+    for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+      if (registry->slaves().slaves(i).info().id() == slaveId) {
+        Registry::Slave* slave = registry->mutable_slaves()->mutable_slaves(i);
+
+        // NOTE: This should be validated/prevented on the master side.
+        if (!slave->has_drain_info() ||
+            slave->drain_info().state() != DRAINING) {
+          return Error("Agent " + stringify(slaveId) + " is not DRAINING");
+        }
+
+        slave->mutable_drain_info()->set_state(DRAINED);
+        break;
+      }
+    }
+  }
+
+  // If not found above, check the unreachable list.
+  if (!found) {
+    for (int i = 0; i < registry->unreachable().slaves().size(); i++) {
+      if (registry->unreachable().slaves(i).id() == slaveId) {
+        Registry::UnreachableSlave* slave =
+          registry->mutable_unreachable()->mutable_slaves(i);
+
+        // NOTE: This should be validated/prevented on the master side.
+        if (!slave->has_drain_info() ||
+            slave->drain_info().state() != DRAINING) {
+          return Error("Agent " + stringify(slaveId) + " is not DRAINING");
+        }
+
+        slave->mutable_drain_info()->set_state(DRAINED);
+        found = true;
+        break;
+      }
+    }
+  }
+
+  return found; // Mutation if found.
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/master/registry_operations.hpp 
b/src/master/registry_operations.hpp
index 4f75735..c7b072b 100644
--- a/src/master/registry_operations.hpp
+++ b/src/master/registry_operations.hpp
@@ -157,6 +157,20 @@ private:
   const bool markGone;
 };
 
+
+// Transitions a DRAINING agent into the DRAINED state.
+class MarkAgentDrained : public RegistryOperation
+{
+public:
+  MarkAgentDrained(const SlaveID& _slaveId);
+
+protected:
+  Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs) override;
+
+private:
+  const SlaveID slaveId;
+};
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index d599c3c..d317a93 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -1015,6 +1015,90 @@ TEST_F(RegistrarTest, DrainAgent)
 }
 
 
+TEST_F(RegistrarTest, MarkAgentDrained)
+{
+  {
+    // Prepare the registrar.
+    Registrar registrar(flags, state);
+    AWAIT_READY(registrar.recover(master));
+
+    // Add an agent to be marked for draining.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(slave))));
+
+    // Try to mark a non-draining agent as drained. This should fail.
+    AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
+        new MarkAgentDrained(slave.id()))));
+
+    // Drain the admitted agent.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DrainAgent(slave.id(), None(), true))));
+  }
+
+  {
+    // Check that agent is now marked for draining.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    EXPECT_TRUE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_EQ(DRAINING, registry->slaves().slaves(0).drain_info().state());
+
+    // Transition from draining to drained.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkAgentDrained(slave.id()))));
+  }
+
+  {
+    // Check that agent is now marked drained.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->slaves().slaves().size());
+    ASSERT_TRUE(registry->slaves().slaves(0).has_drain_info());
+    EXPECT_EQ(DRAINED, registry->slaves().slaves(0).drain_info().state());
+
+    // Try the same sequence of operations for an unreachable agent.
+    // First remove the agent.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new RemoveSlave(slave))));
+
+    // Add the agent back, anew.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new AdmitSlave(slave))));
+
+    // Mark it unreachable.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkSlaveUnreachable(slave, protobuf::getCurrentTime()))));
+
+    // Try to mark the agent drained prematurely.
+    AWAIT_FALSE(registrar.apply(Owned<RegistryOperation>(
+        new MarkAgentDrained(slave.id()))));
+
+    // Now drain properly.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new DrainAgent(slave.id(), None(), true))));
+
+    // And finish draining.
+    AWAIT_TRUE(registrar.apply(Owned<RegistryOperation>(
+        new MarkAgentDrained(slave.id()))));
+  }
+
+  {
+    // Check that unreachable agent is now marked drained.
+    Registrar registrar(flags, state);
+    Future<Registry> registry = registrar.recover(master);
+    AWAIT_READY(registry);
+
+    ASSERT_EQ(1, registry->unreachable().slaves().size());
+    ASSERT_TRUE(registry->unreachable().slaves(0).has_drain_info());
+    EXPECT_EQ(DRAINED, registry->unreachable().slaves(0).drain_info().state());
+  }
+}
+
+
 // Tests that adding and updating quotas in the registry works properly.
 TEST_F(RegistrarTest, UpdateQuota)
 {

Reply via email to