Repository: mesos Updated Branches: refs/heads/1.5.x b4b82b977 -> 689760896
Fixed detaching task volume directories of destroyed frameworks. Review: https://reviews.apache.org/r/65231/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7b30b9cc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7b30b9cc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7b30b9cc Branch: refs/heads/1.5.x Commit: 7b30b9ccd63dbcd3375e012dae6e2ffb9dc6a79f Parents: b4b82b9 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Fri Jan 19 11:08:49 2018 +0800 Committer: Qian Zhang <zhq527...@gmail.com> Committed: Fri Jan 19 11:15:54 2018 +0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 258 +++++++++++++++++++++++++++-------------------- src/slave/slave.hpp | 52 +++++----- 2 files changed, 176 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7b30b9cc/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 956f79d..127b954 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1021,6 +1021,97 @@ Nothing Slave::detachFile(const string& path) } +void Slave::attachTaskVolumeDirectory( + const ExecutorInfo& executorInfo, + const ContainerID& executorContainerId, + const Task& task) +{ + CHECK(executorInfo.has_type() && + executorInfo.type() == ExecutorInfo::DEFAULT); + + CHECK_EQ(task.executor_id(), executorInfo.executor_id()); + + foreach (const Resource& resource, task.resources()) { + // Ignore if there are no disk resources or if the + // disk resources did not specify a volume mapping. + if (!resource.has_disk() || !resource.disk().has_volume()) { + continue; + } + + const Volume& volume = resource.disk().volume(); + + const string executorDirectory = paths::getExecutorRunPath( + flags.work_dir, + info.id(), + task.framework_id(), + task.executor_id(), + executorContainerId); + + const string executorVolumePath = + path::join(executorDirectory, volume.container_path()); + + const string taskPath = paths::getTaskPath( + flags.work_dir, + info.id(), + task.framework_id(), + task.executor_id(), + executorContainerId, + task.task_id()); + + const string taskVolumePath = + path::join(taskPath, volume.container_path()); + + files->attach(executorVolumePath, taskVolumePath) + .onAny(defer( + self(), + &Self::fileAttached, + lambda::_1, + executorVolumePath, + taskVolumePath)); + } +} + + +void Slave::detachTaskVolumeDirectories( + const ExecutorInfo& executorInfo, + const ContainerID& executorContainerId, + const vector<Task>& tasks) +{ + // NOTE: If the executor is not a default executor, this function will + // still be called but with an empty list of tasks. + CHECK(tasks.empty() || + (executorInfo.has_type() && + executorInfo.type() == ExecutorInfo::DEFAULT)); + + foreach (const Task& task, tasks) { + CHECK_EQ(task.executor_id(), executorInfo.executor_id()); + + foreach (const Resource& resource, task.resources()) { + // Ignore if there are no disk resources or if the + // disk resources did not specify a volume mapping. + if (!resource.has_disk() || !resource.disk().has_volume()) { + continue; + } + + const Volume& volume = resource.disk().volume(); + + const string taskPath = paths::getTaskPath( + flags.work_dir, + info.id(), + task.framework_id(), + task.executor_id(), + executorContainerId, + task.task_id()); + + const string taskVolumePath = + path::join(taskPath, volume.container_path()); + + files->detach(taskVolumePath); + } + } +} + + void Slave::detected(const Future<Option<MasterInfo>>& _master) { CHECK(state == DISCONNECTED || @@ -5868,26 +5959,34 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) executor->id, executor->containerId); - os::utime(path); // Update the modification time. - garbageCollect(path) - .onAny(defer(self(), [=](const Future<Nothing>& future) { - detachFile(path); + // NOTE: We keep a list of default executor tasks here to for + // detaching task volume directories, since the executor may be + // already destroyed when the GC completes (MESOS-8460). + vector<Task> defaultExecutorTasks; + if (executor->info.has_type() && + executor->info.type() == ExecutorInfo::DEFAULT) { + foreachvalue (const Task* task, executor->launchedTasks) { + defaultExecutorTasks.push_back(*task); + } - if (executor->info.has_type() && - executor->info.type() == ExecutorInfo::DEFAULT) { - foreachvalue (const Task* task, executor->launchedTasks) { - executor->detachTaskVolumeDirectory(*task); - } + foreachvalue (const Task* task, executor->terminatedTasks) { + defaultExecutorTasks.push_back(*task); + } - foreachvalue (const Task* task, executor->terminatedTasks) { - executor->detachTaskVolumeDirectory(*task); - } + foreach (const shared_ptr<Task>& task, executor->completedTasks) { + defaultExecutorTasks.push_back(*task); + } + } - foreach (const shared_ptr<Task>& task, executor->completedTasks) { - executor->detachTaskVolumeDirectory(*task); - } - } - })); + os::utime(path); // Update the modification time. + garbageCollect(path) + .onAny(defer(self(), &Self::detachFile, path)) + .onAny(defer( + self(), + &Self::detachTaskVolumeDirectories, + executor->info, + executor->containerId, + defaultExecutorTasks)); // Schedule the top level executor work directory, only if the // framework doesn't have any 'pending' tasks for this executor. @@ -5911,10 +6010,8 @@ void Slave::removeExecutor(Framework* framework, Executor* executor) os::utime(path); // Update the modification time. garbageCollect(path) - .onAny(defer(self(), [=](const Future<Nothing>& future) { - detachFile(latestPath); - detachFile(virtualLatestPath); - })); + .onAny(defer(self(), &Self::detachFile, latestPath)) + .onAny(defer(self(), &Self::detachFile, virtualLatestPath)); } if (executor->checkpoint) { @@ -8613,25 +8710,33 @@ void Framework::recoverExecutor( const string path = paths::getExecutorRunPath( slave->flags.work_dir, slave->info.id(), id(), state.id, runId); - slave->garbageCollect(path) - .onAny(defer(slave->self(), [=](const Future<Nothing>& future) { - slave->detachFile(path); + // NOTE: We keep a list of default executor tasks here to for + // detaching task volume directories, since the executor may be + // already destroyed when the GC completes (MESOS-8460). + vector<Task> defaultExecutorTasks; + if (executor->info.has_type() && + executor->info.type() == ExecutorInfo::DEFAULT) { + foreachvalue (const Task* task, executor->launchedTasks) { + defaultExecutorTasks.push_back(*task); + } - if (executor->info.has_type() && - executor->info.type() == ExecutorInfo::DEFAULT) { - foreachvalue (const Task* task, executor->launchedTasks) { - executor->detachTaskVolumeDirectory(*task); - } + foreachvalue (const Task* task, executor->terminatedTasks) { + defaultExecutorTasks.push_back(*task); + } - foreachvalue (const Task* task, executor->terminatedTasks) { - executor->detachTaskVolumeDirectory(*task); - } + foreach (const shared_ptr<Task>& task, executor->completedTasks) { + defaultExecutorTasks.push_back(*task); + } + } - foreach (const shared_ptr<Task>& task, executor->completedTasks) { - executor->detachTaskVolumeDirectory(*task); - } - } - })); + slave->garbageCollect(path) + .onAny(defer(slave, &Slave::detachFile, path)) + .onAny(defer( + slave, + &Slave::detachTaskVolumeDirectories, + executor->info, + executor->containerId, + defaultExecutorTasks)); // GC the executor run's meta directory. slave->garbageCollect(paths::getExecutorRunPath( @@ -8640,10 +8745,8 @@ void Framework::recoverExecutor( // GC the top level executor work directory. slave->garbageCollect(paths::getExecutorPath( slave->flags.work_dir, slave->info.id(), id(), state.id)) - .onAny(defer(slave->self(), [=](const Future<Nothing>& future) { - slave->detachFile(latestPath); - slave->detachFile(virtualLatestPath); - })); + .onAny(defer(slave, &Slave::detachFile, latestPath)) + .onAny(defer(slave, &Slave::detachFile, virtualLatestPath)); // GC the top level executor meta directory. slave->garbageCollect(paths::getExecutorPath( @@ -8938,7 +9041,7 @@ Task* Executor::addLaunchedTask(const TaskInfo& task) launchedTasks[task.task_id()] = t; if (info.has_type() && info.type() == ExecutorInfo::DEFAULT) { - attachTaskVolumeDirectory(*t); + slave->attachTaskVolumeDirectory(info, containerId, *t); } return t; @@ -8960,7 +9063,7 @@ void Executor::completeTask(const TaskID& taskId) info.type() == ExecutorInfo::DEFAULT && completedTasks.full()) { const shared_ptr<Task>& firstTask = completedTasks.front(); - detachTaskVolumeDirectory(*firstTask); + slave->detachTaskVolumeDirectories(info, containerId, {*firstTask}); } Task* task = terminatedTasks[taskId]; @@ -9034,7 +9137,7 @@ void Executor::recoverTask(const TaskState& state, bool recheckpointTask) launchedTasks[state.id] = task; if (info.has_type() && info.type() == ExecutorInfo::DEFAULT) { - attachTaskVolumeDirectory(*task); + slave->attachTaskVolumeDirectory(info, containerId, *task); } // Read updates to get the latest state of the task. @@ -9142,73 +9245,6 @@ bool Executor::incompleteTasks() } -void Executor::attachTaskVolumeDirectory(const Task& task) -{ - CHECK(info.has_type() && info.type() == ExecutorInfo::DEFAULT); - - foreach (const Resource& resource, task.resources()) { - // Ignore if there are no disk resources or if the - // disk resources did not specify a volume mapping. - if (!resource.has_disk() || !resource.disk().has_volume()) { - continue; - } - - const Volume& volume = resource.disk().volume(); - - const string executorVolumePath = - path::join(directory, volume.container_path()); - - const string taskPath = paths::getTaskPath( - slave->flags.work_dir, - slave->info.id(), - frameworkId, - id, - containerId, - task.task_id()); - - const string taskVolumePath = - path::join(taskPath, volume.container_path()); - - slave->files->attach(executorVolumePath, taskVolumePath) - .onAny(defer( - slave, - &Slave::fileAttached, - lambda::_1, - executorVolumePath, - taskVolumePath)); - } -} - - -void Executor::detachTaskVolumeDirectory(const Task& task) -{ - CHECK(info.has_type() && info.type() == ExecutorInfo::DEFAULT); - - foreach (const Resource& resource, task.resources()) { - // Ignore if there are no disk resources or if the - // disk resources did not specify a volume mapping. - if (!resource.has_disk() || !resource.disk().has_volume()) { - continue; - } - - const Volume& volume = resource.disk().volume(); - - const string taskPath = paths::getTaskPath( - slave->flags.work_dir, - slave->info.id(), - frameworkId, - id, - containerId, - task.task_id()); - - const string taskVolumePath = - path::join(taskPath, volume.container_path()); - - slave->files->detach(taskVolumePath); - } -} - - bool Executor::isGeneratedForCommandTask() const { return isGeneratedForCommandTask_; http://git-wip-us.apache.org/repos/asf/mesos/blob/7b30b9cc/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index a07f046..09c01eb 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -409,6 +409,35 @@ public: Nothing detachFile(const std::string& path); + // TODO(qianzhang): This is a workaround to make the default executor + // task's volume directory visible in MESOS UI. In MESOS-7225, we made + // sure a task can access any volumes specified in its disk resources + // from its sandbox by introducing a workaround to the default executor, + // i.e., adding a `SANDBOX_PATH` volume with type `PARENT` to the + // corresponding nested container. This volume gets translated into a + // bind mount in the nested container's mount namespace, which is is not + // visible in Mesos UI because it operates in the host namespace. See + // Mesos-8279 for details. + // + // To make the task's volume directory visible in Mesos UI, here we + // attach the executor's volume directory to it, so when users browse + // task's volume directory in Mesos UI, what they actually browse is the + // executor's volume directory. Note when calling `Files::attach()`, the + // third argument `authorized` is not specified because it is already + // specified when we do the attach for the executor's sandbox and it also + // applies to the executor's tasks. + void attachTaskVolumeDirectory( + const ExecutorInfo& executorInfo, + const ContainerID& executorContainerId, + const Task& task); + + // TODO(qianzhang): Remove the task's volume directory from the /files + // endpoint. This is a workaround for MESOS-8279. + void detachTaskVolumeDirectories( + const ExecutorInfo& executorInfo, + const ContainerID& executorContainerId, + const std::vector<Task>& tasks); + // Triggers a re-detection of the master when the slave does // not receive a ping. void pingTimeout(process::Future<Option<MasterInfo>> future); @@ -833,29 +862,6 @@ public: // Returns true if there are any queued/launched/terminated tasks. bool incompleteTasks(); - // TODO(qianzhang): This is a workaround to make the default executor - // task's volume directory visible in MESOS UI. In MESOS-7225, we made - // sure a task can access any volumes specified in its disk resources - // from its sandbox by introducing a workaround to the default executor, - // i.e., adding a `SANDBOX_PATH` volume with type `PARENT` to the - // corresponding nested container. This volume gets translated into a - // bind mount in the nested container's mount namespace, which is is not - // visible in Mesos UI because it operates in the host namespace. See - // Mesos-8279 for details. - // - // To make the task's volume directory visible in Mesos UI, here we - // attach the executor's volume directory to it, so when users browse - // task's volume directory in Mesos UI, what they actually browse is the - // executor's volume directory. Note when calling `Files::attach()`, the - // third argument `authorized` is not specified because it is already - // specified when we do the attach for the executor's sandbox and it also - // applies to the executor's tasks. - void attachTaskVolumeDirectory(const Task& task); - - // TODO(qianzhang): Remove the task's volume directory from the /files - // endpoint. This is a workaround for MESOS-8279. - void detachTaskVolumeDirectory(const Task& task); - // Sends a message to the connected executor. template <typename Message> void send(const Message& message)