This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a7044bdcd91e467173bb44263658ac5c8df08d8c
Author: Benjamin Bannier <[email protected]>
AuthorDate: Mon Jul 15 10:26:18 2019 -0700

    Changed agent to fail task launches received during draining.
    
    With this patch the agent will now reject task launches while draining.
    While we do not expect the master to send task launches to draining
    agents it is still worthwhile to ensure no new tasks can be launched
    while draining. This invariant simplifies e.g., the handling of drain
    requests since we know that once the agent has entered a draining state
    we only need to terminate existing tasks and no new tasks can appear.
    
    Review: https://reviews.apache.org/r/70958/
---
 src/slave/slave.cpp | 74 +++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 52 insertions(+), 22 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 37385bd..eecd71e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2736,17 +2736,51 @@ void Slave::__run(
     CHECK(framework->removePendingTask(_task.task_id()));
   }
 
-  // Check task invariants.
+  // Check task launch invariants.
   //
   // TODO(bbannier): Instead of copy-pasting identical code to deal
   // with cases where tasks need to be terminated, consolidate code
   // below to decouple checking from terminating.
+  Option<string> kill = None();
+
+  // Fail the launch if the agent is draining.
+  if (drainConfig.isSome()) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because the agent is draining";
+
+    kill = "Task was received while agent was already draining";
+  }
+
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_SLAVE_DRAINING, *kill);
+
+    // Refer to the comment after 'framework->removePendingTask' above
+    // for why we need this.
+    if (framework->idle()) {
+      removeFramework(framework);
+    }
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects a new executor to be launched for this task(s).
+      // To keep the master executor entries updated, the agent needs to send
+      // `ExitedExecutorMessage` even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
+    }
+
+    return;
+  }
+
+  CHECK_NONE(kill);
 
   // If the master sent resource versions, perform a best-effort check
   // that they are consistent with the resources the task uses.
   //
   // TODO(bbannier): Also check executor resources.
-  bool kill = false;
   if (!resourceVersionUuids.empty()) {
     hashset<Option<ResourceProviderID>> usedResourceProviderIds;
     foreach (const TaskInfo& _task, tasks) {
@@ -2767,7 +2801,7 @@ void Slave::__run(
         CHECK(receivedResourceVersions.contains(None()));
 
         if (resourceVersion != receivedResourceVersions.at(None())) {
-          kill = true;
+          kill = "Task assumes outdated resource state";
         }
       } else {
         ResourceProvider* resourceProvider =
@@ -2776,16 +2810,14 @@ void Slave::__run(
         if (resourceProvider == nullptr ||
             resourceProvider->resourceVersion !=
               receivedResourceVersions.at(resourceProviderId.get())) {
-          kill = true;
+          kill = "Task assumes outdated resource state";
         }
       }
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_INVALID_OFFERS,
-        "Task assumes outdated resource state");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_INVALID_OFFERS, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2813,7 +2845,7 @@ void Slave::__run(
     return result;
   };
 
-  CHECK_EQ(kill, false);
+  CHECK_NONE(kill);
 
   // NOTE: If the task/task group or executor uses resources that are
   // checkpointed on the slave (e.g. persistent volumes), we should
@@ -2834,17 +2866,16 @@ void Slave::__run(
                      << " for task " << _task
                      << " of framework " << frameworkId;
 
-        kill = true;
+        kill =
+          "The checkpointed resources being used by the task or task group are 
"
+          "unknown to the agent";
         break;
       }
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_RESOURCES_UNKNOWN,
-        "The checkpointed resources being used by the task or task group are "
-        "unknown to the agent");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_RESOURCES_UNKNOWN, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
@@ -2866,7 +2897,7 @@ void Slave::__run(
     return;
   }
 
-  CHECK_EQ(kill, false);
+  CHECK_NONE(kill);
 
   // Refer to the comment above when looping across tasks on
   // why we need to unallocate resources.
@@ -2879,16 +2910,15 @@ void Slave::__run(
                    << " for executor '" << executorId
                    << "' of framework " << frameworkId;
 
-      kill = true;
+      kill =
+        "The checkpointed resources being used by the executor are unknown "
+        "to the agent";
       break;
     }
   }
 
-  if (kill) {
-    sendTaskDroppedUpdate(
-        TaskStatus::REASON_RESOURCES_UNKNOWN,
-        "The checkpointed resources being used by the executor are unknown "
-        "to the agent");
+  if (kill.isSome()) {
+    sendTaskDroppedUpdate(TaskStatus::REASON_RESOURCES_UNKNOWN, *kill);
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.

Reply via email to