This is an automated email from the ASF dual-hosted git repository. qianzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1bc52e7fd261e1092a5b9618e44977a936cb9d80 Author: Qian Zhang <[email protected]> AuthorDate: Fri Jan 3 16:31:53 2020 +0800 Set resource limits when updating executor container. Review: https://reviews.apache.org/r/71952 --- src/slave/slave.cpp | 135 ++++++++++++++++++++++++++++++++++++++-------------- src/slave/slave.hpp | 6 ++- 2 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index f214560..6a48023 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3347,15 +3347,23 @@ void Slave::__run( << " for executor " << *executor; const ContainerID& containerId = executor->containerId; - const Resources& resources = executor->allocatedResources(); - - publishResources(containerId, resources) - .then(defer(self(), [this, containerId, resources] { - // NOTE: The executor struct could have been removed before - // containerizer update, so we use the captured container ID and - // resources here. If this happens, the containerizer would simply - // skip updating a destroyed container. - return containerizer->update(containerId, resources); + const Resources& resourceRequests = executor->allocatedResources(); + const google::protobuf::Map<string, Value::Scalar>& resourceLimits = + computeExecutorLimits( + executor->info.resources(), + executor->queuedTasks.values(), + executor->launchedTasks.values()); + + publishResources(containerId, resourceRequests) + .then(defer( + self(), + [this, containerId, resourceRequests, resourceLimits] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID, + // resource requests and limits here. If this happens, the + // containerizer would simply skip updating a destroyed container. + return containerizer->update( + containerId, resourceRequests, resourceLimits); })) .onAny(defer(self(), &Self::___run, @@ -5329,7 +5337,12 @@ void Slave::subscribe( } const ContainerID& containerId = executor->containerId; - const Resources& resources = executor->allocatedResources(); + const Resources& resourceRequests = executor->allocatedResources(); + const google::protobuf::Map<string, Value::Scalar>& resourceLimits = + computeExecutorLimits( + executor->info.resources(), + executor->queuedTasks.values(), + executor->launchedTasks.values()); Future<Nothing> resourcesPublished; if (executor->queuedTasks.empty()) { @@ -5343,16 +5356,19 @@ void Slave::subscribe( // after use. See comments in `publishResources` for details. resourcesPublished = Nothing(); } else { - resourcesPublished = publishResources(containerId, resources); + resourcesPublished = publishResources(containerId, resourceRequests); } resourcesPublished - .then(defer(self(), [this, containerId, resources] { - // NOTE: The executor struct could have been removed before - // containerizer update, so we use the captured container ID and - // resources here. If this happens, the containerizer would simply - // skip updating a destroyed container. - return containerizer->update(containerId, resources); + .then(defer( + self(), + [this, containerId, resourceRequests, resourceLimits] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID, + // resource requests and limits here. If this happens, the + // containerizer would simply skip updating a destroyed container. + return containerizer->update( + containerId, resourceRequests, resourceLimits); })) .onAny(defer(self(), &Self::___run, @@ -5506,15 +5522,23 @@ void Slave::registerExecutor( } const ContainerID& containerId = executor->containerId; - const Resources& resources = executor->allocatedResources(); - - publishResources(containerId, resources) - .then(defer(self(), [this, containerId, resources] { - // NOTE: The executor struct could have been removed before - // containerizer update, so we use the captured container ID and - // resources here. If this happens, the containerizer would simply - // skip updating a destroyed container. - return containerizer->update(containerId, resources); + const Resources& resourceRequests = executor->allocatedResources(); + const google::protobuf::Map<string, Value::Scalar>& resourceLimits = + computeExecutorLimits( + executor->info.resources(), + executor->queuedTasks.values(), + executor->launchedTasks.values()); + + publishResources(containerId, resourceRequests) + .then(defer( + self(), + [this, containerId, resourceRequests, resourceLimits] { + // NOTE: The executor struct could have been removed before + // containerizer update, so we use the captured container ID, + // resource requests and limits here. If this happens, the + // containerizer would simply skip updating a destroyed container. + return containerizer->update( + containerId, resourceRequests, resourceLimits); })) .onAny(defer(self(), &Self::___run, @@ -5655,7 +5679,11 @@ void Slave::reregisterExecutor( // Tell the containerizer to update the resources. containerizer->update( executor->containerId, - executor->allocatedResources()) + executor->allocatedResources(), + computeExecutorLimits( + executor->info.resources(), + executor->queuedTasks.values(), + executor->launchedTasks.values())) .onAny(defer(self(), &Self::_reregisterExecutor, lambda::_1, @@ -6199,7 +6227,13 @@ void Slave::_statusUpdate( // have been updated before sending the status update. Note that // duplicate terminal updates are not possible here because they // lead to an error from `Executor::updateTaskState`. - containerizer->update(executor->containerId, executor->allocatedResources()) + containerizer->update( + executor->containerId, + executor->allocatedResources(), + computeExecutorLimits( + executor->info.resources(), + executor->queuedTasks.values(), + executor->launchedTasks.values())) .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, @@ -9980,17 +10014,46 @@ void Slave::initializeResourceProviderManager( google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits( const Resources& executorResources, - const vector<TaskInfo>& tasks) const + const vector<TaskInfo>& taskInfos, + const vector<Task*>& tasks) const { Option<Value::Scalar> executorCpuLimit, executorMemLimit; Value::Scalar cpuRequest, memRequest; - foreach (const TaskInfo& task, tasks) { + foreach (const TaskInfo& taskInfo, taskInfos) { + // Count the task's CPU limit into the executor's CPU limit. + if (taskInfo.limits().count("cpus")) { + setLimit(executorCpuLimit, taskInfo.limits().at("cpus")); + } else { + Option<Value::Scalar> taskCpus = + Resources(taskInfo.resources()).get<Value::Scalar>("cpus"); + + if (taskCpus.isSome()) { + cpuRequest += taskCpus.get(); + } + } + + // Count the task's memory limit into the executor's memory limit. + if (taskInfo.limits().count("mem")) { + setLimit(executorMemLimit, taskInfo.limits().at("mem")); + } else { + Option<Value::Scalar> taskMem = + Resources(taskInfo.resources()).get<Value::Scalar>("mem"); + + if (taskMem.isSome()) { + memRequest += taskMem.get(); + } + } + } + + foreach (const Task* task, tasks) { + CHECK_NOTNULL(task); + // Count the task's CPU limit into the executor's CPU limit. - if (task.limits().count("cpus")) { - setLimit(executorCpuLimit, task.limits().at("cpus")); + if (task->limits().count("cpus")) { + setLimit(executorCpuLimit, task->limits().at("cpus")); } else { Option<Value::Scalar> taskCpus = - Resources(task.resources()).get<Value::Scalar>("cpus"); + Resources(task->resources()).get<Value::Scalar>("cpus"); if (taskCpus.isSome()) { cpuRequest += taskCpus.get(); @@ -9998,11 +10061,11 @@ google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits( } // Count the task's memory limit into the executor's memory limit. - if (task.limits().count("mem")) { - setLimit(executorMemLimit, task.limits().at("mem")); + if (task->limits().count("mem")) { + setLimit(executorMemLimit, task->limits().at("mem")); } else { Option<Value::Scalar> taskMem = - Resources(task.resources()).get<Value::Scalar>("mem"); + Resources(task->resources()).get<Value::Scalar>("mem"); if (taskMem.isSome()) { memRequest += taskMem.get(); diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index a5a367e..d7e65e0 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -779,9 +779,13 @@ private: const Flags& flags, const SlaveID& slaveId); + // This function is used to compute limits for executors before they + // are launched as well as when updating running executors, so we must + // accept both `TaskInfo` and `Task` types to handle both cases. google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits( const Resources& executorResources, - const std::vector<TaskInfo>& tasks) const; + const std::vector<TaskInfo>& taskInfos, + const std::vector<Task*>& tasks = {}) const; protobuf::master::Capabilities requiredMasterCapabilities;
