This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 23f40bed53490127b9e48bc32995c77504dcdc55 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Fri Mar 1 15:49:32 2019 -0800 Avoid dereferencing removed executors and launching containers for them. When launching executors and tasks, there is no guarantee that the executors still remain after `Slave::publishResources` is returned. If not, the executor struct should not be dereferenced and the executor containers should not be launched at all. NOTE: The patch makes `Slave::launchExecutor` called asynchronously even if there is no secret generator. However this should not affect the correctness of executor launching. Review: https://reviews.apache.org/r/70084 --- src/slave/slave.cpp | 242 ++++++++++++++++++++++++++-------------------------- src/slave/slave.hpp | 12 ++- 2 files changed, 131 insertions(+), 123 deletions(-) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 794a9c9..a3ea5d2 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2972,22 +2972,51 @@ void Slave::__run( executor = added.get(); - if (secretGenerator) { - generateSecret(framework->id(), executor->id, executor->containerId) - .onAny(defer( - self(), - &Self::launchExecutor, - lambda::_1, - frameworkId, - executorId, - taskGroup.isNone() ? task.get() : Option<TaskInfo>::none())); - } else { - Slave::launchExecutor( - None(), + // NOTE: We make a copy of the executor info because we may mutate it with + // some default fields and resources. + ExecutorInfo executorInfo_ = executorInfo; + + // Populate the command info for default executor. We modify the executor + // info to avoid resetting command info upon reregistering with the master + // since the master doesn't store them; they are generated by the slave. + if (executorInfo_.has_type() && + executorInfo_.type() == ExecutorInfo::DEFAULT) { + CHECK(!executorInfo_.has_command()); + + *executorInfo_.mutable_command() = + defaultExecutorCommandInfo(flags.launcher_dir, executor->user); + } + + // NOTE: We modify the ExecutorInfo to include the task's resources when + // launching the executor so that the containerizer has non-zero resources + // to work with when the executor has no resources. This should be revisited + // after MESOS-600. + if (task.isSome()) { + *executorInfo_.mutable_resources() = + Resources(executorInfo.resources()) + task->resources(); + } + + // Add the default container info to the executor info. + // TODO(jieyu): Rename the flag to be default_mesos_container_info. + if (!executorInfo_.has_container() && + flags.default_container_info.isSome()) { + *executorInfo_.mutable_container() = flags.default_container_info.get(); + } + + publishResources(executor->containerId, executorInfo_.resources()) + .then(defer( + self(), + &Self::generateSecret, frameworkId, executorId, - taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()); - } + executor->containerId)) + .onAny(defer( + self(), + &Self::launchExecutor, + lambda::_1, + frameworkId, + executorInfo_, + taskGroup.isNone() ? task.get() : Option<TaskInfo>::none())); } CHECK_NOTNULL(executor); @@ -3072,11 +3101,16 @@ void Slave::__run( LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup) << " for executor " << *executor; - publishResources(executor->containerId, executor->allocatedResources()) - .then(defer(self(), [=] { - return containerizer->update( - executor->containerId, - executor->allocatedResources()); + const ContainerID& containerId = executor->containerId; + const Resources& resources = executor->allocatedResources(); + + publishResources(containerId, resources) + .then(defer(self(), [this, containerId, resources] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID and + // resources here. If this happens, the containerizer would simply + // skip updating a destroyed container. + return containerizer->update(containerId, resources); })) .onAny(defer(self(), &Self::___run, @@ -3321,12 +3355,15 @@ void Slave::___run( } -// Generates a secret for executor authentication. -Future<Secret> Slave::generateSecret( +Future<Option<Secret>> Slave::generateSecret( const FrameworkID& frameworkId, const ExecutorID& executorId, const ContainerID& containerId) { + if (!secretGenerator) { + return None(); + } + Principal principal( Option<string>::none(), { @@ -3336,7 +3373,7 @@ Future<Secret> Slave::generateSecret( }); return secretGenerator->generate(principal) - .then([](const Secret& secret) -> Future<Secret> { + .then([](const Secret& secret) -> Future<Option<Secret>> { Option<Error> error = common::validation::validateSecret(secret); if (error.isSome()) { @@ -3356,31 +3393,31 @@ Future<Secret> Slave::generateSecret( // Launches an executor which was previously created. void Slave::launchExecutor( - const Option<Future<Secret>>& future, + const Future<Option<Secret>>& authenticationToken, const FrameworkID& frameworkId, - const ExecutorID& executorId, + const ExecutorInfo& executorInfo, const Option<TaskInfo>& taskInfo) { Framework* framework = getFramework(frameworkId); if (framework == nullptr) { - LOG(WARNING) << "Ignoring launching executor '" << executorId - << "' because the framework " << frameworkId - << " does not exist"; + LOG(WARNING) << "Ignoring launching executor '" + << executorInfo.executor_id() << "' because the framework " + << frameworkId << " does not exist"; return; } if (framework->state == Framework::TERMINATING) { - LOG(WARNING) << "Ignoring launching executor '" << executorId - << "' of framework " << frameworkId - << " because the framework is terminating"; + LOG(WARNING) << "Ignoring launching executor '" + << executorInfo.executor_id() << "' of framework " + << frameworkId << " because the framework is terminating"; return; } - Executor* executor = framework->getExecutor(executorId); + Executor* executor = framework->getExecutor(executorInfo.executor_id()); if (executor == nullptr) { - LOG(WARNING) << "Ignoring launching executor '" << executorId - << "' of framework " << frameworkId - << " because the executor does not exist"; + LOG(WARNING) << "Ignoring launching executor '" + << executorInfo.executor_id() << "' of framework " + << frameworkId << " because the executor does not exist"; return; } @@ -3406,78 +3443,38 @@ void Slave::launchExecutor( termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); termination.set_message("Executor " + executorState); - executorTerminated(frameworkId, executorId, termination); + executorTerminated(frameworkId, executor->id, termination); return; } CHECK_EQ(Executor::REGISTERING, executor->state); - Option<Secret> authenticationToken; - - if (future.isSome()) { - if (!future->isReady()) { - LOG(ERROR) << "Failed to launch executor " << *executor - << " in container " << executor->containerId - << " because secret generation failed: " - << (future->isFailed() ? future->failure() : "discarded"); + if (!authenticationToken.isReady()) { + const string message = "Secret generation failed: " + + (authenticationToken.isFailed() + ? authenticationToken.failure() : "future discarded"); - ContainerTermination termination; - termination.set_state(TASK_FAILED); - termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); - termination.set_message( - "Secret generation failed: " + - (future->isFailed() ? future->failure() : "discarded")); + LOG(ERROR) << "Failed to launch executor " << *executor << " in container " + << executor->containerId << ": " << message; - executorTerminated(frameworkId, executorId, termination); + ContainerTermination termination; + termination.set_state(TASK_FAILED); + termination.set_reason(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); + termination.set_message(message); - return; - } + executorTerminated(frameworkId, executor->id, termination); - authenticationToken = future->get(); + return; } // Tell the containerizer to launch the executor. - // NOTE: We make a copy of the executor info because we may mutate - // it with some default fields and resources. - ExecutorInfo executorInfo_ = executor->info; - - // Populate the command info for default executor. We modify the ExecutorInfo - // to avoid resetting command info upon reregistering with the master since - // the master doesn't store them; they are generated by the slave. - if (executorInfo_.has_type() && - executorInfo_.type() == ExecutorInfo::DEFAULT) { - CHECK(!executorInfo_.has_command()); - - executorInfo_.mutable_command()->CopyFrom( - defaultExecutorCommandInfo(flags.launcher_dir, executor->user)); - } - - Resources resources = executorInfo_.resources(); - - // NOTE: We modify the ExecutorInfo to include the task's - // resources when launching the executor so that the containerizer - // has non-zero resources to work with when the executor has - // no resources. This should be revisited after MESOS-600. - if (taskInfo.isSome()) { - resources += taskInfo->resources(); - } - - executorInfo_.mutable_resources()->CopyFrom(resources); - - // Add the default container info to the executor info. - // TODO(jieyu): Rename the flag to be default_mesos_container_info. - if (!executorInfo_.has_container() && - flags.default_container_info.isSome()) { - executorInfo_.mutable_container()->CopyFrom( - flags.default_container_info.get()); - } // Bundle all the container launch fields together. ContainerConfig containerConfig; - containerConfig.mutable_executor_info()->CopyFrom(executorInfo_); - containerConfig.mutable_command_info()->CopyFrom(executorInfo_.command()); - containerConfig.mutable_resources()->CopyFrom(executorInfo_.resources()); + *containerConfig.mutable_executor_info() = executorInfo; + *containerConfig.mutable_command_info() = executorInfo.command(); + *containerConfig.mutable_resources() = executorInfo.resources(); containerConfig.set_directory(executor->directory); if (executor->user.isSome()) { @@ -3496,9 +3493,8 @@ void Slave::launchExecutor( // (2) If this is a non command task (e.g., default executor, custom // executor), the `ExecutorInfo.container` is what we want to // tell the containerizer anyway. - if (executorInfo_.has_container()) { - containerConfig.mutable_container_info() - ->CopyFrom(executorInfo_.container()); + if (executorInfo.has_container()) { + *containerConfig.mutable_container_info() = executorInfo.container(); } if (executor->isGeneratedForCommandTask()) { @@ -3511,11 +3507,11 @@ void Slave::launchExecutor( // Prepare environment variables for the executor. map<string, string> environment = executorEnvironment( flags, - executorInfo_, + executorInfo, executor->directory, info.id(), self(), - authenticationToken, + authenticationToken.get(), framework->info.checkpoint()); // Prepare the filename of the pidfile, for checkpoint-enabled frameworks. @@ -3534,20 +3530,18 @@ void Slave::launchExecutor( << "' of framework " << framework->id(); // Launch the container. - publishResources(executor->containerId, resources) - .then(defer(self(), [=] { - return containerizer->launch( - executor->containerId, - containerConfig, - environment, - pidCheckpointPath); - })) - .onAny(defer(self(), - &Self::executorLaunched, - frameworkId, - executor->id, - executor->containerId, - lambda::_1)); + // + // NOTE: This must be called synchronously to avoid launching a container for + // a removed executor. + containerizer->launch( + executor->containerId, containerConfig, environment, pidCheckpointPath) + .onAny(defer( + self(), + &Self::executorLaunched, + frameworkId, + executor->id, + executor->containerId, + lambda::_1)); // Make sure the executor registers within the given timeout. delay(flags.executor_registration_timeout, @@ -5027,11 +5021,16 @@ void Slave::subscribe( } } - publishResources(executor->containerId, executor->allocatedResources()) - .then(defer(self(), [=] { - return containerizer->update( - executor->containerId, - executor->allocatedResources()); + const ContainerID& containerId = executor->containerId; + const Resources& resources = executor->allocatedResources(); + + publishResources(containerId, resources) + .then(defer(self(), [this, containerId, resources] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID and + // resources here. If this happens, the containerizer would simply + // skip updating a destroyed container. + return containerizer->update(containerId, resources); })) .onAny(defer(self(), &Self::___run, @@ -5184,11 +5183,16 @@ void Slave::registerExecutor( } } - publishResources(executor->containerId, executor->allocatedResources()) - .then(defer(self(), [=] { - return containerizer->update( - executor->containerId, - executor->allocatedResources()); + const ContainerID& containerId = executor->containerId; + const Resources& resources = executor->allocatedResources(); + + publishResources(containerId, resources) + .then(defer(self(), [this, containerId, resources] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID and + // resources here. If this happens, the containerizer would simply + // skip updating a destroyed container. + return containerizer->update(containerId, resources); })) .onAny(defer(self(), &Self::___run, diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 2bffdc4..c740bf7 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -434,16 +434,20 @@ public: void finalize() override; void exited(const process::UPID& pid) override; - process::Future<Secret> generateSecret( + // Generates a secret for executor authentication. Returns None if there is + // no secret generator. + process::Future<Option<Secret>> generateSecret( const FrameworkID& frameworkId, const ExecutorID& executorId, const ContainerID& containerId); - // If an executor is launched for a task group, `taskInfo` would not be set. + // `executorInfo` is a mutated executor info with some default fields and + // resources. If an executor is launched for a task group, `taskInfo` would + // not be set. void launchExecutor( - const Option<process::Future<Secret>>& future, + const process::Future<Option<Secret>>& authorizationToken, const FrameworkID& frameworkId, - const ExecutorID& executorId, + const ExecutorInfo& executorInfo, const Option<TaskInfo>& taskInfo); void fileAttached(const process::Future<Nothing>& result,
