This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 3348b5cfcda9f10b1d683505352f7245b96127d5 Author: Joseph Wu <[email protected]> AuthorDate: Wed Jul 3 10:17:46 2019 -0700 Implemented transition from DRAINING to DRAINED in master. This adds logic in the master to detect when a DRAINING agent can be transitioned into a DRAINED state. When this happens, the new state is checkpointed into the registry and, if the agent is to be marked "gone", the master will remove the agent. Review: https://reviews.apache.org/r/71008 --- src/master/http.cpp | 4 +++ src/master/master.cpp | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/master/master.hpp | 6 ++++ 3 files changed, 106 insertions(+) diff --git a/src/master/http.cpp b/src/master/http.cpp index 13db2d8..2b3faa7 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -3928,6 +3928,10 @@ Future<Response> Master::Http::_drainAgent( DrainSlaveMessage message; message.mutable_config()->CopyFrom(drainConfig); master->send(slave->pid, message); + + // Check if the agent is already drained and transition it + // appropriately if so. + master->checkAndTransitionDrainingAgent(slave); } return OK(); diff --git a/src/master/master.cpp b/src/master/master.cpp index 19275d5..61d0e7b 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -6322,6 +6322,85 @@ void Master::declineInverseOffers( } +void Master::checkAndTransitionDrainingAgent(Slave* slave) +{ + CHECK_NOTNULL(slave); + + const SlaveID& slaveId = slave->id; + + if (!slaves.draining.contains(slaveId) || + slaves.draining.at(slaveId).state() == DRAINED) { + // Nothing to do for non-draining or already drained agents. + return; + } + + // Check if the agent has any tasks running or operations pending. + if (!slave->pendingTasks.empty() || + !slave->tasks.empty() || + !slave->operations.empty()) { + VLOG(1) + << "DRAINING Agent " << slaveId << " has " + << slave->pendingTasks.size() << " pending tasks, " + << slave->tasks.size() << " tasks, and " + << slave->operations.size() << " operations"; + return; + } + + if (slaves.markingGone.contains(slaveId)) { + LOG(INFO) + << "Ignoring transition of agent " << slaveId << " to the DRAINED" + << " state because agent is being marked gone"; + return; + } + + // If the agent will be marked gone afterwards, we do not need to mark + // the agent as DRAINED. Simply marking gone will suffice. + if (slaves.draining.at(slaveId).config().mark_gone()) { + LOG(INFO) << "Marking agent " << slaveId << " in the DRAINED state as gone"; + + slaves.markingGone.insert(slaveId); + + TimeInfo goneTime = protobuf::getCurrentTime(); + + registrar->apply(Owned<RegistryOperation>( + new MarkSlaveGone(slaveId, goneTime))) + .onAny(defer( + self(), + [this, slaveId, goneTime](const Future<bool>& result) { + CHECK_READY(result) + << "Failed to mark agent gone in the registry"; + + markGone(slaveId, goneTime); + })); + } else { + LOG(INFO) << "Transitioning agent " << slaveId << " to the DRAINED state"; + + registrar->apply(Owned<RegistryOperation>( + new MarkAgentDrained(slaveId))) + .onAny(defer( + self(), + [this, slaveId](const Future<bool>& result) { + CHECK_READY(result) + << "Failed to update draining info in the registry"; + + // This can happen if the agent sends an UnregisterSlaveMessage + // right before this method is called. + if (!slaves.draining.contains(slaveId)) { + LOG(INFO) + << "Agent " << slaveId << " was removed while being" + << " marked as DRAINED"; + return; + } + + slaves.draining[slaveId].set_state(DRAINED); + + LOG(INFO) + << "Agent " << slaveId << " successfully marked as DRAINED"; + })); + } +} + + void Master::reviveOffers( const UPID& from, const FrameworkID& frameworkId, @@ -6686,6 +6765,8 @@ void Master::acknowledge( send(slave->pid, message); metrics->valid_status_update_acknowledgements++; + + checkAndTransitionDrainingAgent(slave); } @@ -6817,6 +6898,8 @@ void Master::acknowledgeOperationStatus( send(slave->pid, message); metrics->valid_operation_status_update_acknowledgements++; + + checkAndTransitionDrainingAgent(slave); } @@ -8075,6 +8158,10 @@ void Master::___reregisterSlave( slaves.draining.at(slaveInfo.id()).config()); send(slave->pid, message); + + // Check if the agent is already drained and transition it + // appropriately if so. + checkAndTransitionDrainingAgent(slave); } // Inform the agent of the new framework pids for its tasks, and @@ -9471,6 +9558,8 @@ void Master::markGone(const SlaveID& slaveId, const TimeInfo& goneTime) } slaves.unreachable.erase(slaveId); + slaves.draining.erase(slaveId); + slaves.deactivated.erase(slaveId); // TODO(vinod): Consider moving these tasks into `completedTasks` by // transitioning them to a terminal state and sending status updates. @@ -11684,6 +11773,13 @@ void Master::_removeSlave( CHECK(machines[slave->machineId].slaves.contains(slave->id)); machines[slave->machineId].slaves.erase(slave->id); + // Remove any draining information about the agent. + // NOTE: This should not be mirrored by `__removeSlave` because that + // method handles both unreachable and gone agents. Unreachable agents + // will retain their draining information, but gone agents will not. + slaves.draining.erase(slave->id); + slaves.deactivated.erase(slave->id); + // Kill the slave observer. terminate(slave->observer); wait(slave->observer); diff --git a/src/master/master.hpp b/src/master/master.hpp index 205cdb2..ffa7423 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1108,6 +1108,12 @@ private: Framework* framework, const mesos::scheduler::Call::DeclineInverseOffers& decline); + // Should be called after each terminal task status update acknowledgement + // or terminal operation acknowledgement. If an agent is draining, this + // checks if all pending tasks or operations have terminated and then + // transitions the DRAINING agent to DRAINED. + void checkAndTransitionDrainingAgent(Slave* slave); + void revive( Framework* framework, const mesos::scheduler::Call::Revive& revive);
