This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3348b5cfcda9f10b1d683505352f7245b96127d5
Author: Joseph Wu <[email protected]>
AuthorDate: Wed Jul 3 10:17:46 2019 -0700

    Implemented transition from DRAINING to DRAINED in master.
    
    This adds logic in the master to detect when a DRAINING agent can
    be transitioned into a DRAINED state.  When this happens, the new
    state is checkpointed into the registry and, if the agent is to be
    marked "gone", the master will remove the agent.
    
    Review: https://reviews.apache.org/r/71008
---
 src/master/http.cpp   |  4 +++
 src/master/master.cpp | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/master/master.hpp |  6 ++++
 3 files changed, 106 insertions(+)

diff --git a/src/master/http.cpp b/src/master/http.cpp
index 13db2d8..2b3faa7 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3928,6 +3928,10 @@ Future<Response> Master::Http::_drainAgent(
             DrainSlaveMessage message;
             message.mutable_config()->CopyFrom(drainConfig);
             master->send(slave->pid, message);
+
+            // Check if the agent is already drained and transition it
+            // appropriately if so.
+            master->checkAndTransitionDrainingAgent(slave);
           }
 
           return OK();
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 19275d5..61d0e7b 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6322,6 +6322,85 @@ void Master::declineInverseOffers(
 }
 
 
+void Master::checkAndTransitionDrainingAgent(Slave* slave)
+{
+  CHECK_NOTNULL(slave);
+
+  const SlaveID& slaveId = slave->id;
+
+  if (!slaves.draining.contains(slaveId) ||
+      slaves.draining.at(slaveId).state() == DRAINED) {
+    // Nothing to do for non-draining or already drained agents.
+    return;
+  }
+
+  // Check if the agent has any tasks running or operations pending.
+  if (!slave->pendingTasks.empty() ||
+      !slave->tasks.empty() ||
+      !slave->operations.empty()) {
+    VLOG(1)
+      << "DRAINING Agent " << slaveId << " has "
+      << slave->pendingTasks.size() << " pending tasks, "
+      << slave->tasks.size() << " tasks, and "
+      << slave->operations.size() << " operations";
+    return;
+  }
+
+  if (slaves.markingGone.contains(slaveId)) {
+    LOG(INFO)
+      << "Ignoring transition of agent " << slaveId << " to the DRAINED"
+      << " state because agent is being marked gone";
+    return;
+  }
+
+  // If the agent will be marked gone afterwards, we do not need to mark
+  // the agent as DRAINED. Simply marking gone will suffice.
+  if (slaves.draining.at(slaveId).config().mark_gone()) {
+    LOG(INFO) << "Marking agent " << slaveId << " in the DRAINED state as 
gone";
+
+    slaves.markingGone.insert(slaveId);
+
+    TimeInfo goneTime = protobuf::getCurrentTime();
+
+    registrar->apply(Owned<RegistryOperation>(
+        new MarkSlaveGone(slaveId, goneTime)))
+      .onAny(defer(
+          self(),
+          [this, slaveId, goneTime](const Future<bool>& result) {
+            CHECK_READY(result)
+              << "Failed to mark agent gone in the registry";
+
+            markGone(slaveId, goneTime);
+          }));
+  } else {
+    LOG(INFO) << "Transitioning agent " << slaveId << " to the DRAINED state";
+
+    registrar->apply(Owned<RegistryOperation>(
+        new MarkAgentDrained(slaveId)))
+      .onAny(defer(
+          self(),
+          [this, slaveId](const Future<bool>& result) {
+            CHECK_READY(result)
+              << "Failed to update draining info in the registry";
+
+            // This can happen if the agent sends an UnregisterSlaveMessage
+            // right before this method is called.
+            if (!slaves.draining.contains(slaveId)) {
+              LOG(INFO)
+                << "Agent " << slaveId << " was removed while being"
+                << " marked as DRAINED";
+              return;
+            }
+
+            slaves.draining[slaveId].set_state(DRAINED);
+
+            LOG(INFO)
+              << "Agent " << slaveId << " successfully marked as DRAINED";
+          }));
+  }
+}
+
+
 void Master::reviveOffers(
     const UPID& from,
     const FrameworkID& frameworkId,
@@ -6686,6 +6765,8 @@ void Master::acknowledge(
   send(slave->pid, message);
 
   metrics->valid_status_update_acknowledgements++;
+
+  checkAndTransitionDrainingAgent(slave);
 }
 
 
@@ -6817,6 +6898,8 @@ void Master::acknowledgeOperationStatus(
   send(slave->pid, message);
 
   metrics->valid_operation_status_update_acknowledgements++;
+
+  checkAndTransitionDrainingAgent(slave);
 }
 
 
@@ -8075,6 +8158,10 @@ void Master::___reregisterSlave(
         slaves.draining.at(slaveInfo.id()).config());
 
     send(slave->pid, message);
+
+    // Check if the agent is already drained and transition it
+    // appropriately if so.
+    checkAndTransitionDrainingAgent(slave);
   }
 
   // Inform the agent of the new framework pids for its tasks, and
@@ -9471,6 +9558,8 @@ void Master::markGone(const SlaveID& slaveId, const 
TimeInfo& goneTime)
     }
 
     slaves.unreachable.erase(slaveId);
+    slaves.draining.erase(slaveId);
+    slaves.deactivated.erase(slaveId);
 
     // TODO(vinod): Consider moving these tasks into `completedTasks` by
     // transitioning them to a terminal state and sending status updates.
@@ -11684,6 +11773,13 @@ void Master::_removeSlave(
   CHECK(machines[slave->machineId].slaves.contains(slave->id));
   machines[slave->machineId].slaves.erase(slave->id);
 
+  // Remove any draining information about the agent.
+  // NOTE: This should not be mirrored by `__removeSlave` because that
+  // method handles both unreachable and gone agents. Unreachable agents
+  // will retain their draining information, but gone agents will not.
+  slaves.draining.erase(slave->id);
+  slaves.deactivated.erase(slave->id);
+
   // Kill the slave observer.
   terminate(slave->observer);
   wait(slave->observer);
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 205cdb2..ffa7423 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1108,6 +1108,12 @@ private:
       Framework* framework,
       const mesos::scheduler::Call::DeclineInverseOffers& decline);
 
+  // Should be called after each terminal task status update acknowledgement
+  // or terminal operation acknowledgement. If an agent is draining, this
+  // checks if all pending tasks or operations have terminated and then
+  // transitions the DRAINING agent to DRAINED.
+  void checkAndTransitionDrainingAgent(Slave* slave);
+
   void revive(
       Framework* framework,
       const mesos::scheduler::Call::Revive& revive);

Reply via email to