This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 9cce93413b691f7627491ac264022f7d15ead9cc
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Fri Mar 1 15:49:32 2019 -0800

    Avoid dereferencing removed executors and launching containers for them.
    
    When launching executors and tasks, there is no guarantee that the
    executors still remain after `Slave::publishResources` is returned. If
    not, the executor struct should not be dereferenced and the executor
    containers should not be launched at all.
    
    NOTE: The patch makes `Slave::launchExecutor` called asynchronously even
    if there is no secret generator. However this should not affect the
    correctness of executor launching.
    
    Review: https://reviews.apache.org/r/70084
---
 src/slave/slave.cpp | 242 ++++++++++++++++++++++++++--------------------------
 src/slave/slave.hpp |  12 ++-
 2 files changed, 131 insertions(+), 123 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 10af517..c0b5388 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2964,22 +2964,51 @@ void Slave::__run(
 
     executor = added.get();
 
-    if (secretGenerator) {
-      generateSecret(framework->id(), executor->id, executor->containerId)
-        .onAny(defer(
-              self(),
-              &Self::launchExecutor,
-              lambda::_1,
-              frameworkId,
-              executorId,
-              taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
-    } else {
-      Slave::launchExecutor(
-          None(),
+    // NOTE: We make a copy of the executor info because we may mutate it with
+    // some default fields and resources.
+    ExecutorInfo executorInfo_ = executorInfo;
+
+    // Populate the command info for default executor. We modify the executor
+    // info to avoid resetting command info upon reregistering with the master
+    // since the master doesn't store them; they are generated by the slave.
+    if (executorInfo_.has_type() &&
+        executorInfo_.type() == ExecutorInfo::DEFAULT) {
+      CHECK(!executorInfo_.has_command());
+
+      *executorInfo_.mutable_command() =
+        defaultExecutorCommandInfo(flags.launcher_dir, executor->user);
+    }
+
+    // NOTE: We modify the ExecutorInfo to include the task's resources when
+    // launching the executor so that the containerizer has non-zero resources
+    // to work with when the executor has no resources. This should be 
revisited
+    // after MESOS-600.
+    if (task.isSome()) {
+      *executorInfo_.mutable_resources() =
+        Resources(executorInfo.resources()) + task->resources();
+    }
+
+    // Add the default container info to the executor info.
+    // TODO(jieyu): Rename the flag to be default_mesos_container_info.
+    if (!executorInfo_.has_container() &&
+        flags.default_container_info.isSome()) {
+      *executorInfo_.mutable_container() = flags.default_container_info.get();
+    }
+
+    publishResources(executor->containerId, executorInfo_.resources())
+      .then(defer(
+          self(),
+          &Self::generateSecret,
           frameworkId,
           executorId,
-          taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
-    }
+          executor->containerId))
+      .onAny(defer(
+          self(),
+          &Self::launchExecutor,
+          lambda::_1,
+          frameworkId,
+          executorInfo_,
+          taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
   }
 
   CHECK_NOTNULL(executor);
@@ -3064,11 +3093,16 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      publishResources(executor->containerId, executor->allocatedResources())
-        .then(defer(self(), [=] {
-          return containerizer->update(
-              executor->containerId,
-              executor->allocatedResources());
+      const ContainerID& containerId = executor->containerId;
+      const Resources& resources = executor->allocatedResources();
+
+      publishResources(containerId, resources)
+        .then(defer(self(), [this, containerId, resources] {
+          // NOTE: The executor struct could have been removed before
+          // containerizer update, so we use the captured container ID and
+          // resources here. If this happens, the containerizer would simply
+          // skip updating a destroyed container.
+          return containerizer->update(containerId, resources);
         }))
         .onAny(defer(self(),
                      &Self::___run,
@@ -3313,12 +3347,15 @@ void Slave::___run(
 }
 
 
-// Generates a secret for executor authentication.
-Future<Secret> Slave::generateSecret(
+Future<Option<Secret>> Slave::generateSecret(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId,
     const ContainerID& containerId)
 {
+  if (!secretGenerator) {
+    return None();
+  }
+
   Principal principal(
       Option<string>::none(),
       {
@@ -3328,7 +3365,7 @@ Future<Secret> Slave::generateSecret(
       });
 
   return secretGenerator->generate(principal)
-    .then([](const Secret& secret) -> Future<Secret> {
+    .then([](const Secret& secret) -> Future<Option<Secret>> {
       Option<Error> error = common::validation::validateSecret(secret);
 
       if (error.isSome()) {
@@ -3348,31 +3385,31 @@ Future<Secret> Slave::generateSecret(
 
 // Launches an executor which was previously created.
 void Slave::launchExecutor(
-    const Option<Future<Secret>>& future,
+    const Future<Option<Secret>>& authenticationToken,
     const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
+    const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo)
 {
   Framework* framework = getFramework(frameworkId);
   if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring launching executor '" << executorId
-                 << "' because the framework " << frameworkId
-                 << " does not exist";
+    LOG(WARNING) << "Ignoring launching executor '"
+                 << executorInfo.executor_id() << "' because the framework "
+                 << frameworkId << " does not exist";
     return;
   }
 
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring launching executor '" << executorId
-                 << "' of framework " << frameworkId
-                 << " because the framework is terminating";
+    LOG(WARNING) << "Ignoring launching executor '"
+                 << executorInfo.executor_id() << "' of framework "
+                 << frameworkId << " because the framework is terminating";
     return;
   }
 
-  Executor* executor = framework->getExecutor(executorId);
+  Executor* executor = framework->getExecutor(executorInfo.executor_id());
   if (executor == nullptr) {
-    LOG(WARNING) << "Ignoring launching executor '" << executorId
-                 << "' of framework " << frameworkId
-                 << " because the executor does not exist";
+    LOG(WARNING) << "Ignoring launching executor '"
+                 << executorInfo.executor_id() << "' of framework "
+                 << frameworkId << " because the executor does not exist";
     return;
   }
 
@@ -3398,78 +3435,38 @@ void Slave::launchExecutor(
     termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
     termination.set_message("Executor " + executorState);
 
-    executorTerminated(frameworkId, executorId, termination);
+    executorTerminated(frameworkId, executor->id, termination);
 
     return;
   }
 
   CHECK_EQ(Executor::REGISTERING, executor->state);
 
-  Option<Secret> authenticationToken;
+  if (!authenticationToken.isReady()) {
+    const string message = "Secret generation failed: " +
+      (authenticationToken.isFailed()
+         ? authenticationToken.failure() : "future discarded");
 
-  if (future.isSome()) {
-    if (!future->isReady()) {
-      LOG(ERROR) << "Failed to launch executor " << *executor
-                 << " in container " << executor->containerId
-                 << " because secret generation failed: "
-                 << (future->isFailed() ? future->failure() : "discarded");
+    LOG(ERROR) << "Failed to launch executor " << *executor << " in container "
+               << executor->containerId << ": " << message;
 
-      ContainerTermination termination;
-      termination.set_state(TASK_FAILED);
-      termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
-      termination.set_message(
-          "Secret generation failed: " +
-          (future->isFailed() ? future->failure() : "discarded"));
-
-      executorTerminated(frameworkId, executorId, termination);
+    ContainerTermination termination;
+    termination.set_state(TASK_FAILED);
+    termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED);
+    termination.set_message(message);
 
-      return;
-    }
+    executorTerminated(frameworkId, executor->id, termination);
 
-    authenticationToken = future->get();
+    return;
   }
 
   // Tell the containerizer to launch the executor.
-  // NOTE: We make a copy of the executor info because we may mutate
-  // it with some default fields and resources.
-  ExecutorInfo executorInfo_ = executor->info;
-
-  // Populate the command info for default executor. We modify the ExecutorInfo
-  // to avoid resetting command info upon reregistering with the master since
-  // the master doesn't store them; they are generated by the slave.
-  if (executorInfo_.has_type() &&
-      executorInfo_.type() == ExecutorInfo::DEFAULT) {
-    CHECK(!executorInfo_.has_command());
-
-    executorInfo_.mutable_command()->CopyFrom(
-        defaultExecutorCommandInfo(flags.launcher_dir, executor->user));
-  }
-
-  Resources resources = executorInfo_.resources();
-
-  // NOTE: We modify the ExecutorInfo to include the task's
-  // resources when launching the executor so that the containerizer
-  // has non-zero resources to work with when the executor has
-  // no resources. This should be revisited after MESOS-600.
-  if (taskInfo.isSome()) {
-    resources += taskInfo->resources();
-  }
-
-  executorInfo_.mutable_resources()->CopyFrom(resources);
-
-  // Add the default container info to the executor info.
-  // TODO(jieyu): Rename the flag to be default_mesos_container_info.
-  if (!executorInfo_.has_container() &&
-      flags.default_container_info.isSome()) {
-    executorInfo_.mutable_container()->CopyFrom(
-        flags.default_container_info.get());
-  }
 
   // Bundle all the container launch fields together.
   ContainerConfig containerConfig;
-  containerConfig.mutable_executor_info()->CopyFrom(executorInfo_);
-  containerConfig.mutable_command_info()->CopyFrom(executorInfo_.command());
-  containerConfig.mutable_resources()->CopyFrom(executorInfo_.resources());
+  *containerConfig.mutable_executor_info() = executorInfo;
+  *containerConfig.mutable_command_info() = executorInfo.command();
+  *containerConfig.mutable_resources() = executorInfo.resources();
   containerConfig.set_directory(executor->directory);
 
   if (executor->user.isSome()) {
@@ -3488,9 +3485,8 @@ void Slave::launchExecutor(
   // (2) If this is a non command task (e.g., default executor, custom
   //     executor), the `ExecutorInfo.container` is what we want to
   //     tell the containerizer anyway.
-  if (executorInfo_.has_container()) {
-    containerConfig.mutable_container_info()
-      ->CopyFrom(executorInfo_.container());
+  if (executorInfo.has_container()) {
+    *containerConfig.mutable_container_info() = executorInfo.container();
   }
 
   if (executor->isGeneratedForCommandTask()) {
@@ -3503,11 +3499,11 @@ void Slave::launchExecutor(
   // Prepare environment variables for the executor.
   map<string, string> environment = executorEnvironment(
       flags,
-      executorInfo_,
+      executorInfo,
       executor->directory,
       info.id(),
       self(),
-      authenticationToken,
+      authenticationToken.get(),
       framework->info.checkpoint());
 
   // Prepare the filename of the pidfile, for checkpoint-enabled frameworks.
@@ -3526,20 +3522,18 @@ void Slave::launchExecutor(
             << "' of framework " << framework->id();
 
   // Launch the container.
-  publishResources(executor->containerId, resources)
-    .then(defer(self(), [=] {
-      return containerizer->launch(
-          executor->containerId,
-          containerConfig,
-          environment,
-          pidCheckpointPath);
-    }))
-    .onAny(defer(self(),
-                 &Self::executorLaunched,
-                 frameworkId,
-                 executor->id,
-                 executor->containerId,
-                 lambda::_1));
+  //
+  // NOTE: This must be called synchronously to avoid launching a container for
+  // a removed executor.
+  containerizer->launch(
+      executor->containerId, containerConfig, environment, pidCheckpointPath)
+    .onAny(defer(
+        self(),
+        &Self::executorLaunched,
+        frameworkId,
+        executor->id,
+        executor->containerId,
+        lambda::_1));
 
   // Make sure the executor registers within the given timeout.
   delay(flags.executor_registration_timeout,
@@ -4801,11 +4795,16 @@ void Slave::subscribe(
         }
       }
 
-      publishResources(executor->containerId, executor->allocatedResources())
-        .then(defer(self(), [=] {
-          return containerizer->update(
-              executor->containerId,
-              executor->allocatedResources());
+      const ContainerID& containerId = executor->containerId;
+      const Resources& resources = executor->allocatedResources();
+
+      publishResources(containerId, resources)
+        .then(defer(self(), [this, containerId, resources] {
+          // NOTE: The executor struct could have been removed before
+          // containerizer update, so we use the captured container ID and
+          // resources here. If this happens, the containerizer would simply
+          // skip updating a destroyed container.
+          return containerizer->update(containerId, resources);
         }))
         .onAny(defer(self(),
                      &Self::___run,
@@ -4958,11 +4957,16 @@ void Slave::registerExecutor(
         }
       }
 
-      publishResources(executor->containerId, executor->allocatedResources())
-        .then(defer(self(), [=] {
-          return containerizer->update(
-              executor->containerId,
-              executor->allocatedResources());
+      const ContainerID& containerId = executor->containerId;
+      const Resources& resources = executor->allocatedResources();
+
+      publishResources(containerId, resources)
+        .then(defer(self(), [this, containerId, resources] {
+          // NOTE: The executor struct could have been removed before
+          // containerizer update, so we use the captured container ID and
+          // resources here. If this happens, the containerizer would simply
+          // skip updating a destroyed container.
+          return containerizer->update(containerId, resources);
         }))
         .onAny(defer(self(),
                      &Self::___run,
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2d5019d..8e5adc5 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -428,16 +428,20 @@ public:
   void finalize() override;
   void exited(const process::UPID& pid) override;
 
-  process::Future<Secret> generateSecret(
+  // Generates a secret for executor authentication. Returns None if there is
+  // no secret generator.
+  process::Future<Option<Secret>> generateSecret(
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
       const ContainerID& containerId);
 
-  // If an executor is launched for a task group, `taskInfo` would not be set.
+  // `executorInfo` is a mutated executor info with some default fields and
+  // resources. If an executor is launched for a task group, `taskInfo` would
+  // not be set.
   void launchExecutor(
-      const Option<process::Future<Secret>>& future,
+      const process::Future<Option<Secret>>& authorizationToken,
       const FrameworkID& frameworkId,
-      const ExecutorID& executorId,
+      const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& taskInfo);
 
   void fileAttached(const process::Future<Nothing>& result,

Reply via email to